jjzjj

并发专题-自己动手实现一个线程池

pq217 2023-10-06 原文

前言

本文主要参照线程池ThreadPoolExecutor的实现方式自己写一个线程池,主要是因为ThreadPoolExecutor的源码读起来还是挺费劲,想通过自己仿写的方式加深理解

首先要了解ThreadPoolExecutor线程池的工作机制,不明白的看这里

初步思路

需要解决的问题
  • 线程池的主要作用是保存限制数量的线程,当有执行任务时,从中选择某个线程去执行,而不是来个任务就new一个Thread,鉴于thread.start()执行完成之后就会销毁,所以如何保持线程不销毁是个关键,解决思路有很多,比如可以给线程这样一个长期运行任务:当用用户任务到达时执行用户任务,没有用户任务时wait,用户任务出现时再notify唤醒执行,这样线程就不会被销毁掉
  • 线程池在所有线程都工作的情况下,依然可以存储任务,等线程池某个线程空闲时,任务就会被执行
解决方案

结合以上俩点,最终解决方案如下:
使用阻塞队列BlockingQueue来保存用户任务,好处在于获取时如果取不到就会一直阻塞等待,而每个线程执行的实际任务尝试获取任务队列中的用户任务,取不到就等待,取到了就执行,执行完之后再回来等待,形成一个死循环,线程执行的就是一个永久性任务,不会被销毁(等待的过程阻塞也不会占用CPU)

初步实现

首先线程池的本质是Executor,所以我们也按照规范继承并实现execute方法

public class MyThreadPool implements Executor
核心属性

定义一个基本参数corePoolSize来确定核心线程数大小,一个BlockingQueue来保存任务队列

public class MyThreadPool implements Executor {
    // 核心线程数大小
    private int corePoolSize = 10;
    // 阻塞任务队列
    private BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(10);

    /**
     * 执行任务的方法,todo 待实现
     * @param command
     */
    @Override
    public void execute(Runnable command) {
    }
}
worker

接下来就要开始定义线程池中的工作者,每个工作者有自己的任务-完成第一个分配的工作并去任务队列取新任务做,由于是线程池,所以每个工作者都必须有一个工位即Thread,接下来实现这个工作者

// 拥有工作任务的工作者
private final class Worker implements Runnable {
    // 线程
    Thread thread;
    // 第一个任务
    Runnable firstTask;
    // 工作者初始化
    Worker(Runnable task) {
        // 保存第一个任务
        firstTask = task;
        // 初始化工作台
        thread = new Thread(this);
    }

    // 工作者的任务计划
    @Override
    public void run() {
        Runnable task = firstTask;
        firstTask = null;
        // 第一个任务未执行执行第一个,执行完后去队列获取任务,执行结束后再次尝试获取
        while (task != null || (task = getTask()) != null) {
            try {
                task.run();
            } finally {
                // 当前任务置空重新处理
                task = null;
            }
        }
    }

    // 从队列获取任务
    private Runnable getTask() {
        try {
            return workQueue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
            return null;
        }
    }
}
执行任务

接下来就是执行外界任务的核心逻辑,首先用一个集合存放所有工作者

private final HashSet<Worker> workers = new HashSet<Worker>();

执行任务的逻辑,小于corePoolSize即创建一个新工作者执行任务,否则放入任务对列,等待被空闲的工作者执行,队列满了直接拒绝任务

@Override
public void execute(Runnable command) {
    // 如果工作线程少于corePoolSize,新增线程执行
    if (workers.size() < corePoolSize) {
        addWorker(command);
        return;
    }
    // 否则添加到任务队列
    if (workQueue.offer(command)) {
        return;
    }
    // 任务队列满了:拒绝
    throw new RejectedExecutionException();
}

其中addWorker就是new一个Worker,并启动工作

/**
 * 添加工作者
 * @param command
 */
private void addWorker(Runnable command) {
    Worker w = new Worker(command);
    final Thread t = w.thread;
    t.start();
    workers.add(w);
}
线程工厂

上面的线程生成有个问题,即线程的命名是默认的没有规律的,如果有多个线程池,调试时完全分不清哪些线程是干什么活产生的,因此我们把new Thread的创建方式修改为工厂模式创建方式,并把让其可配,工厂接口Dong Li都定义好了,直接用

public class MyThreadPool implements Executor {

    private int corePoolSize;

    private BlockingQueue<Runnable> workQueue;

    private ThreadFactory threadFactory;

    public MyThreadPool3(int corePoolSize, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        this.corePoolSize = corePoolSize;
        this.workQueue = workQueue;
        this.threadFactory = threadFactory;
    }

这样再worker初始化是,创建线程的方式变为

// 工作者初始化
Worker(Runnable task) {
    // 保存第一个任务
    firstTask = task;
    // 初始化工作台,使用工厂创建
    thread = threadFactory.newThread(this);
}
测试

这样,一个简单非线程安全的线程池就做完了,完整如下

public class MyThreadPool3 implements Executor {

    private int corePoolSize;

    private BlockingQueue<Runnable> workQueue;

    private ThreadFactory threadFactory;

    public MyThreadPool3(int corePoolSize, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        this.corePoolSize = corePoolSize;
        this.workQueue = workQueue;
        this.threadFactory = threadFactory;
    }

    // 保存所有的worker
    private final HashSet<Worker> workers = new HashSet<Worker>();

    @Override
    public void execute(Runnable command) {
        // 如果工作线程少于corePoolSize,新增线程执行
        if (workers.size() < corePoolSize) {
            addWorker(command);
            return;
        }
        // 否则添加到任务队列
        if (workQueue.offer(command)) {
            return;
        }
        // 任务队列满了:拒绝
        throw new RejectedExecutionException();
    }

    /**
     * 添加工作者
     * @param command
     */
    private void addWorker(Runnable command) {
        Worker w = new Worker(command);
        final Thread t = w.thread;
        t.start();
        workers.add(w);
    }

    // 拥有工作任务的工作者
    private final class Worker implements Runnable {
        // 线程
        Thread thread;
        // 第一个任务
        Runnable firstTask;
        // 工作者初始化
        Worker(Runnable task) {
            // 保存第一个任务
            firstTask = task;
            // 初始化工作台
            thread = threadFactory.newThread(this);
        }

        // 工作者的任务计划
        @Override
        public void run() {
            Runnable task = firstTask;
            firstTask = null;
            // 第一个任务未执行执行第一个,执行完后去队列获取任务,执行结束后再次尝试获取
            while (task != null || (task = getTask()) != null) {
                try {
                    task.run();
                } finally {
                    // 当前任务置空重新处理
                    task = null;
                }
            }
        }

        // 从队列获取任务
        private Runnable getTask() {
            try {
                return workQueue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
                return null;
            }
        }
    }
}

测试一下是否好用,创建一个核心线程数是10,任务队列容量10的线程池,执行15个3秒的任务

public static void main(String[] args) {
    ThreadFactory threadFactory = new ThreadFactory() {
        private AtomicInteger no = new AtomicInteger(0);
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "my-thread-"+(no.incrementAndGet()));
        }
    };
    Executor poolExecutor = new MyThreadPool3(10, new LinkedBlockingQueue<>(10), threadFactory);
    for(int i=1;i<=15;i++) {
        int finalI = i;
        Runnable task = () -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"-----任务["+finalI+"]done");
        };
        poolExecutor.execute(task);
    }
}

3秒左右第一波任务干完,输出如下

my-thread-1-----任务[1]done
my-thread-10-----任务[10]done
my-thread-9-----任务[9]done
my-thread-2-----任务[2]done
my-thread-3-----任务[3]done
my-thread-5-----任务[5]done
my-thread-4-----任务[4]done
my-thread-8-----任务[8]done
my-thread-7-----任务[7]done
my-thread-6-----任务[6]done

再过3秒左右,第二波任务干完,输出如下

my-thread-1-----任务[11]done
my-thread-9-----任务[13]done
my-thread-2-----任务[14]done
my-thread-10-----任务[12]done
my-thread-3-----任务[15]done

可以看到我们的简单线程池基本用起来没问题,达到了固定线程数分担所有任务的目的

增加maximumPoolSize

作用

ThreadPoolExecutor还有一个重要参数就是增加maximumPoolSize,代表最大可运行的线程数量,worker数超过了corePoolSize,但没超过maximumPoolSize且任务队列已满的情况下,会创建临时线程,这些临时线程完成初始任务后,会尝试去队列里获取新线程,超过了指定存活时间没有获取到任务,临时线程就没有啥用了,就会被销毁

其主要作用是在任务量暴增时,适当的增加线程量来完成任务,并且在空闲时间销毁这些救火的线程

实现思路

新增一个参数maximumPoolSize,在原有执行逻辑上修改:任务队列满且线程数小于maximumPoolSize时尝试新增临时线程,临时线程和核心线程主要差别在于读取队列设定时间读取不到时就销毁,可以BlockingQueue的poll(long timeout, TimeUnit unit)方法读取固定时间,读不到就直接跳循环,此时worker的任务执行完毕,线程自动就销毁了

实现

新增必要参数maximumPoolSizekeepAliveTime,通过构造函数实现可配

public class MyThreadPool4 implements Executor {
    // 核心线程数
    private int corePoolSize;
    // 任务队列
    private BlockingQueue<Runnable> workQueue;
    // 线程工厂
    private ThreadFactory threadFactory;
    // 最大线程数
    private volatile int maximumPoolSize;
    // 保持存活时间,纳秒
    private volatile long keepAliveTime;

    public MyThreadPool4(int corePoolSize,
                         int maximumPoolSize,
                         long keepAliveTime,
                         TimeUnit unit,
                         BlockingQueue<Runnable> workQueue,
                         ThreadFactory threadFactory) {
        this.corePoolSize = corePoolSize;
        this.workQueue = workQueue;
        this.threadFactory = threadFactory;
        this.maximumPoolSize = maximumPoolSize;
        // 按单位将时间转换为纳秒
        this.keepAliveTime = unit.toNanos(keepAliveTime);
    }

再区分一下核心worker和临时worker,核心worker一直等待任务队列,而临时worker等待固定时间

private final class Worker implements Runnable {
    Thread thread;
    Runnable firstTask;
    // 新增:是否核心线程
    boolean core;
    Worker(Runnable task, boolean core) {
        firstTask = task;
        thread = threadFactory.newThread(this);
        // 新增:设置是否core
        this.core = core;
    }

core与否在获取任务队列时的区别

private Runnable getTask(boolean core) {
    try {
        // 非core使用BlockingQueue的poll(long timeout, TimeUnit unit)方法等待固定时间
        Runnable runnable = core?workQueue.take():workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
        return null;
    } catch (InterruptedException e) {
        e.printStackTrace();
        return null;
    }
}

此时新增worker时传入是否core的标识

@Override
public void execute(Runnable command) {
    // 如果工作线程少于corePoolSize,新增线程执行,此时core=true
    if (workers.size() < corePoolSize) {
        addWorker(command, true);
        return;
    }
    // 否则添加到任务队列
    if (workQueue.offer(command)) {
        return;
    }
    // 小于最大线程数,新增线程执行,此时core=false
    if (workers.size() < maximumPoolSize) {
        addWorker(command, false);
        return;
    }
    // 任务队列满了:拒绝
    throw new RejectedExecutionException();
}

这样我们的线程池可以支持忙时临时增加线程的功能,测试一下:
设置线程池核心10,最大20,队列10, 执行30个任务,每个任务工作(休眠)3秒,保持时间是10秒

public static void main(String[] args) {
    ThreadFactory threadFactory = new ThreadFactory() {
        private AtomicInteger no = new AtomicInteger(0);
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "my-thread-"+(no.incrementAndGet()));
        }
    };
    Executor poolExecutor = new MyThreadPool4(
            10,
            20,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(10),
            threadFactory);
    for(int i=1;i<=30;i++) {
        int finalI = i;
        Runnable task = () -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"-----任务["+finalI+"] done");
        };
        poolExecutor.execute(task);
    }
}

结果如下,3秒左右输出

my-thread-1-----任务[1] done
my-thread-14-----任务[24] done
my-thread-9-----任务[9] done
my-thread-19-----任务[29] done
my-thread-7-----任务[7] done
my-thread-18-----任务[28] done
my-thread-15-----任务[25] done
my-thread-8-----任务[8] done
my-thread-16-----任务[26] done
my-thread-4-----任务[4] done
my-thread-2-----任务[2] done
my-thread-11-----任务[21] done
my-thread-5-----任务[5] done
my-thread-3-----任务[3] done
my-thread-20-----任务[30] done
my-thread-6-----任务[6] done
my-thread-10-----任务[10] done
my-thread-12-----任务[22] done
my-thread-13-----任务[23] done
my-thread-17-----任务[27] done

再过三秒输出

my-thread-1-----任务[11] done
my-thread-9-----任务[13] done
my-thread-19-----任务[14] done
my-thread-18-----任务[16] done
my-thread-7-----任务[15] done
my-thread-14-----任务[12] done
my-thread-15-----任务[17] done
my-thread-4-----任务[20] done
my-thread-8-----任务[18] done
my-thread-16-----任务[19] done

说明:
1~10任务由于小于coreSize,所以增加线程立即执行
11~20由于大于coreSize,所以被加入到队列
21~30由于队列已满,maximumPoolSize未满,直接增加临时线程处理
所以110和2130马上执行,11~20是等第一波任务结束后再执行
再用调试工具跟踪一下

image.png

可以看到前10个核心worker工作完成之后一直等待,后十个临时worker工作完成之后,等待10秒就自动销毁了

淘汰机制

看上图也发现了个问题,上图第一个阶段结束就剩下11~20这10个任务了,此时线程池核心线程就可以应对,按理说再等keepAliveTime设定的10秒,临时worker就改销毁了,可实际是第二阶段临时worker抢走了任务,而核心worker部分闲置,就好比一个公司,工作已经很少了还把工作分给临时工而让正式员工闲着,这显然存在额外开销,因此不合理

结合ThreadPoolExecutor看了一眼,它的解决思路很清晰,解决方案如下:

不区分核心worker或和临时worker,当worker数大于corePoolSize时,采用竞争淘汰手段,哪个worker在keepAliveTime时间段内没有工作,就被淘汰,动态的判断,从而保证工作少时worker数最快的控制在corePoolSize,这样就非常顺滑的节省了资源

那就按照这种思路进行改造:
首先删除worker的core标识,使用一个workerCount标记池中worker数量

// 记录worker数量
private volatile AtomicInteger workerCount = new AtomicInteger(0);

当然addWorker时就需要workerCount++,然后再getTask方法上下功夫,把core标识的校验修改为workerCount>corePoolSize,大概如下

// 从队列获取任务
private Runnable getTask() {
    // 判断worker数量是否大于corePoolSize,如果大就使用固定时间等待
    boolean timed =  workerCount.get() > corePoolSize;
    try {
        Runnable runnable = !timed?workQueue.take():workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
        return runnable;
    } catch (InterruptedException e) {
        return null;
    }
}

但真这样写问题太大了,首先workerCount > corePoolSize校验结束后,一定要执行workerCount--操作的,否则所有的线程都会变成阻塞固定时间,如果任务队列任务很少,那么很多线程都被淘汰了,线程数会少于corePoolSize
一般思路是判断完成之后马上做减法,但线程池毕竟是多个线程,可能同时走到workerCount > corePoolSize的判断,再快也不管用

所以正确的姿势做自旋+CAS的方法避免线程销毁出界,如下

// 从队列获取任务
private Runnable getTask() {
    // 自旋
    for(;;) {
        // 判断worker数量是否大于corePoolSize,如果大就使用固定时间等待
        int wc = workerCount.get();
        boolean timed = wc > corePoolSize;
        if (timed) {
            // CAS 减一操作
            if(!workerCount.compareAndSet(wc, wc - 1)) {
                // 减一失败,重新验证
                continue;
            }
        }
        try {
            // 限制时间使用BlockingQueue的poll(long timeout, TimeUnit unit)方法等待固定时间
            Runnable runnable = !timed ? workQueue.take() : workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            return runnable;
        } catch (InterruptedException e) {
            e.printStackTrace();
            return null;
        }
    }
}

这样就避免了销毁多了,导致线程不满corePoolSize的现象发生,但是还是有个严重问题:wc > corePoolSize只是判断worker数是否大于corePoolSize,此时只是固定时间阻塞,worker并未实际销毁,workerCount和实际的worker数量出现不匹配,如果固定时间内接到了任务那么这个worker可能会存活很久,workerCount就会一直与worker数量不匹配,这肯定不行

所以问题的关键在于workerCount--操作之后,线程是一定要销毁掉的,即一定要返回null,那么可以修改个思路:worker数大于corePoolSize后,所有线程getTask时都固定阻塞一段时间等待,如果等待不到再去做workerCount--,通过CAS加自选保证只有超过corePoolSize数量实际workerCount--成功,没成功的就是corePool继续等待任务,代码改动如下:

// 从队列获取任务
private Runnable getTask() {
    boolean timedOut = false; // 标识最后一次poll是否超时?
    // 自旋
    for (; ; ) {
        // 判断worker数量是否大于corePoolSize,如果大就使用固定时间等待
        int wc = workerCount.get();
        boolean timed = wc > corePoolSize;
        if (timed && timedOut) {
            // CAS 减一操作
            if (workerCount.compareAndSet(wc, wc - 1))
                // worker成功登记下线,返回null,worker就一定会下线
                return null;
            // 登记失败,重新判断
            continue;
        }
        try {
            // 限制时间使用BlockingQueue的poll(long timeout, TimeUnit unit)方法等待固定时间
            Runnable r = !timed ? workQueue.take() : workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            if (r != null)
                // 只有任务获取成功才返回
                return r;
            // 任务获取失败,回头去判断是否改下线
            timedOut = true;
        } catch (InterruptedException e) {
            e.printStackTrace();
            return null;
        }
    }
}

代码有点,整理一下一个worker获取task的流程图如下


getTask流程图

再测试一次,队列大小修改为5,此时结果如下图

image.png

可以看到抢到第二波任务的线程一定不会销毁,其中编号17,18成为了coolPoolSize的一员,而编号3,4的线程虽然创建的早,但是没有接到任务就被淘汰了,此时的淘汰机制没有先来后到,只看手中是否有任务,而最终保留的线程数即为coolPoolSize数

拒绝策略

这个就比较简单了,通过策略模式,可以实现再拒绝时走固定的策略而不是像上面代码单纯的throw异常,策略接口直接用Doug Lea的RejectedExecutionHandler吧

最终代码

public class MyThreadPool implements Executor {
    // 核心线程数
    private int corePoolSize;
    // 任务队列
    private BlockingQueue<Runnable> workQueue;
    // 线程工厂
    private ThreadFactory threadFactory;
    // 最大线程数
    private volatile int maximumPoolSize;
    // 保持存活时间,纳秒
    private volatile long keepAliveTime;
    // 记录worker数量
    private volatile AtomicInteger workerCount = new AtomicInteger(0);
    // 拒绝策略
    RejectedExecutionHandler handler;

    public MyThreadPool(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        BlockingQueue<Runnable> workQueue,
                        ThreadFactory threadFactory,
                        RejectedExecutionHandler handler) {
        this.corePoolSize = corePoolSize;
        this.workQueue = workQueue;
        this.threadFactory = threadFactory;
        this.maximumPoolSize = maximumPoolSize;
        // 按单位将时间转换为纳秒
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.handler = handler;
    }

    public static void main(String[] args) {
        System.out.println(TimeUnit.MILLISECONDS.toNanos(1));
    }

    // 保存所有的worker
    private final HashSet<Worker> workers = new HashSet<Worker>();

    @Override
    public void execute(Runnable command) {
        // 如果工作线程少于corePoolSize,新增线程执行,此时core=true
        if (workerCount.get() < corePoolSize) {
            addWorker(command, true);
            return;
        }
        // 否则添加到任务队列
        if (workQueue.offer(command)) {
            return;
        }
        // 小于最大线程数,新增线程执行,此时core=false
        if (workerCount.get() < maximumPoolSize) {
            addWorker(command, false);
            return;
        }
        handler.rejectedExecution(command, null);
    }

    /**
     * 添加工作者
     *
     * @param command
     */
    private void addWorker(Runnable command, boolean core) {
        Worker w = new Worker(command);
        final Thread t = w.thread;
        t.start();
        workers.add(w);
        workerCount.incrementAndGet();
    }

    // 拥有工作任务的工作者
    private final class Worker implements Runnable {
        // 线程
        Thread thread;
        // 第一个任务
        Runnable firstTask;

        // 工作者初始化
        Worker(Runnable task) {
            // 保存第一个任务
            firstTask = task;
            // 初始化工作台
            thread = threadFactory.newThread(this);
        }

        // 工作者的任务计划
        @Override
        public void run() {
            Runnable task = firstTask;
            firstTask = null;
            // 第一个任务未执行执行第一个,执行完后去队列获取任务,执行结束后再次尝试获取
            while (task != null || (task = getTask()) != null) {
                try {
                    task.run();
                } finally {
                    // 完成后删除任务
                    task = null;
                }
            }
            // 任务结束,删除worker
            workers.remove(this);
        }

        // 从队列获取任务
        private Runnable getTask() {
            boolean timedOut = false; // 标识最后一次poll是否超时?
            // 自旋
            for (; ; ) {
                // 判断worker数量是否大于corePoolSize,如果大就使用固定时间等待
                int wc = workerCount.get();
                boolean timed = wc > corePoolSize;
                if (timed && timedOut) {
                    // CAS 减一操作
                    if (workerCount.compareAndSet(wc, wc - 1))
                        // worker成功登记下线,返回null,worker就一定会下线
                        return null;
                    // 登记失败,重新判断
                    continue;
                }
                try {
                    // 限制时间使用BlockingQueue的poll(long timeout, TimeUnit unit)方法等待固定时间
                    Runnable r = !timed ? workQueue.take() : workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                    if (r != null)
                        // 只有任务获取成功才返回
                        return r;
                    // 任务获取失败,回头去判断是否改下线
                    timedOut = true;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    return null;
                }
            }
        }
    }
}

测试代码

public class MyTest {
    public static void main(String[] args) {
        // 线程工厂
        ThreadFactory threadFactory = new ThreadFactory() {
            private AtomicInteger no = new AtomicInteger(0);
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "pq-thread-"+(no.incrementAndGet()));
            }
        };
        // 拒绝策略
        RejectedExecutionHandler rejectedExecutionHandler = (r, executor) -> {
            System.out.println("为什么拒绝我!");
        };
        Executor poolExecutor = new MyThreadPool5(
                10,
                20,
                10,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(5),
                threadFactory,
                rejectedExecutionHandler);
        for(int i=1;i<=30;i++) {
            int finalI = i;
            Runnable task = () -> {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+"-----任务["+finalI+"] done");
            };
            poolExecutor.execute(task);
        }
    }
}

总结

如开篇所属,自己写线程池的目的是为了理解线程池类:ThreadPoolExecutor,基本最终代码就是把ThreadPoolExecutor的核心代码拿过来,运行跑一跑,再回头看基本就明白Doug Lea代码为什么这么写了,但只是实现了一个简单版,而且完全没有考虑多线程运行同一个线程池时的线程安全问题(尝试考虑过,但太复杂了,不得不佩服大神)
ThreadPoolExecutor包含大量的线程安全处理和异常处理,并且workerCount的计算用的二进制方式,很高逼格,能力有限,感觉自己理解到这程度足以,感兴趣的可以自己去看一下~

有关并发专题-自己动手实现一个线程池的更多相关文章

  1. ruby - 使用 Vim Rails,您可以创建一个新的迁移文件并一次性打开它吗? - 2

    使用带有Rails插件的vim,您可以创建一个迁移文件,然后一次性打开该文件吗?textmate也可以这样吗? 最佳答案 你可以使用rails.vim然后做类似的事情::Rgeneratemigratonadd_foo_to_bar插件将打开迁移生成的文件,这正是您想要的。我不能代表textmate。 关于ruby-使用VimRails,您可以创建一个新的迁移文件并一次性打开它吗?,我们在StackOverflow上找到一个类似的问题: https://sta

  2. ruby-on-rails - Rails - 一个 View 中的多个模型 - 2

    我需要从一个View访问多个模型。以前,我的links_controller仅用于提供以不同方式排序的链接资源。现在我想包括一个部分(我假设)显示按分数排序的顶级用户(@users=User.all.sort_by(&:score))我知道我可以将此代码插入每个链接操作并从View访问它,但这似乎不是“ruby方式”,我将需要在不久的将来访问更多模型。这可能会变得很脏,是否有针对这种情况的任何技术?注意事项:我认为我的应用程序正朝着单一格式和动态页面内容的方向发展,本质上是一个典型的网络应用程序。我知道before_filter但考虑到我希望应用程序进入的方向,这似乎很麻烦。最终从任何

  3. ruby-on-rails - 渲染另一个 Controller 的 View - 2

    我想要做的是有2个不同的Controller,client和test_client。客户端Controller已经构建,我想创建一个test_clientController,我可以使用它来玩弄客户端的UI并根据需要进行调整。我主要是想绕过我在客户端中内置的验证及其对加载数据的管理Controller的依赖。所以我希望test_clientController加载示例数据集,然后呈现客户端Controller的索引View,以便我可以调整客户端UI。就是这样。我在test_clients索引方法中试过这个:classTestClientdefindexrender:template=>

  4. ruby-on-rails - 如果 Object::try 被发送到一个 nil 对象,为什么它会起作用? - 2

    如果您尝试在Ruby中的nil对象上调用方法,则会出现NoMethodError异常并显示消息:"undefinedmethod‘...’fornil:NilClass"然而,有一个tryRails中的方法,如果它被发送到一个nil对象,它只返回nil:require'rubygems'require'active_support/all'nil.try(:nonexisting_method)#noNoMethodErrorexceptionanymore那么try如何在内部工作以防止该异常? 最佳答案 像Ruby中的所有其他对象

  5. ruby - 为什么 SecureRandom.uuid 创建一个唯一的字符串? - 2

    关闭。这个问题需要detailsorclarity.它目前不接受答案。想改进这个问题吗?通过editingthispost添加细节并澄清问题.关闭8年前。Improvethisquestion为什么SecureRandom.uuid创建一个唯一的字符串?SecureRandom.uuid#=>"35cb4e30-54e1-49f9-b5ce-4134799eb2c0"SecureRandom.uuid方法创建的字符串从不重复?

  6. ruby - RuntimeError(自动加载常量 Apps 多线程时检测到循环依赖 - 2

    我收到这个错误:RuntimeError(自动加载常量Apps时检测到循环依赖当我使用多线程时。下面是我的代码。为什么会这样?我尝试多线程的原因是因为我正在编写一个HTML抓取应用程序。对Nokogiri::HTML(open())的调用是一个同步阻塞调用,需要1秒才能返回,我有100,000多个页面要访问,所以我试图运行多个线程来解决这个问题。有更好的方法吗?classToolsController0)app.website=array.join(',')putsapp.websiteelseapp.website="NONE"endapp.saveapps=Apps.order("

  7. ruby - 如何根据特征实现 FactoryGirl 的条件行为 - 2

    我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden

  8. ruby-on-rails - Rails - 从另一个模型中创建一个模型的实例 - 2

    我有一个正在构建的应用程序,我需要一个模型来创建另一个模型的实例。我希望每辆车都有4个轮胎。汽车模型classCar轮胎模型classTire但是,在make_tires内部有一个错误,如果我为Tire尝试它,则没有用于创建或新建的activerecord方法。当我检查轮胎时,它没有这些方法。我该如何补救?错误是这样的:未定义的方法'create'forActiveRecord::AttributeMethods::Serialization::Tire::Module我测试了两个环境:测试和开发,它们都因相同的错误而失败。 最佳答案

  9. ruby - 用 Ruby 编写一个简单的网络服务器 - 2

    我想在Ruby中创建一个用于开发目的的极其简单的Web服务器(不,不想使用现成的解决方案)。代码如下:#!/usr/bin/rubyrequire'socket'server=TCPServer.new('127.0.0.1',8080)whileconnection=server.acceptheaders=[]length=0whileline=connection.getsheaders想法是从命令行运行这个脚本,提供另一个脚本,它将在其标准输入上获取请求,并在其标准输出上返回完整的响应。到目前为止一切顺利,但事实证明这真的很脆弱,因为它在第二个请求上中断并出现错误:/usr/b

  10. ruby - 一个 YAML 对象可以引用另一个吗? - 2

    我想让一个yaml对象引用另一个,如下所示:intro:"Hello,dearuser."registration:$introThanksforregistering!new_message:$introYouhaveanewmessage!上面的语法只是它如何工作的一个例子(这也是它在thiscpanmodule中的工作方式。)我正在使用标准的ruby​​yaml解析器。这可能吗? 最佳答案 一些yaml对象确实引用了其他对象:irb>require'yaml'#=>trueirb>str="hello"#=>"hello"ir

随机推荐