当前位置 > it书童 > java > 正文

线程池-治理线程的最大法宝

java it书童 2021-01-09 17:09:19 0赞 0踩 49阅读 0评论

并发工具类的作用

  1. 为了并发安全

  2. 管理线程、提高效率

  3. 线程协作

线程池

线程池作用:

  • 控制资源总量

  • 复用线程

如果不使用线程池,每个任务都新开一个线程处理

package juc.threadpool;

public class EveryTaskOneThread {
    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(new Task());
            thread.start();
        }
    }

    static class Task implements Runnable {
        @Override
        public void run() {
            System.out.println("执行了任务");
        }
    }
}

这种方式开销太大,反复创建并销毁线程的开销是很大的

  1. 反复创建线程开销大

  2. 过多的线程会占用太多内存

线程池的好处:

  1. 加快响应速度

  2. 合理利用 CPU 和内存

  3. 统一管理线程

增减线程的时机

线程池构造函数的参数

corePoolSize: 核心线程数

线程池在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待有任务到来时,再创建新线程去执行任务

线程池有可能会在核心线程数的基础上,额外增加一些线程,但是这些新增的线程数有一个上限,就是最大量 maxPoolSize

添加线程规则:

  1. 如果线程数 < corePoolSize, 即使其他工作线程处于空闲状态,也会创建一个新线程来运行新任务

  2. 如果线程数 >= corePoolSize,且 < maximumPoolsize,则将任务放入队列

  3. 如果队列已满,且线程数 < maximumPoolsize, 则创建一个新线程来运行任务

  4. 如果队列,且线程数 > maximumPoolsize,则拒绝该任务

增减线程的特点:

  1. 通过设置 corePoolSize 和 maximumPoolsize,就可以创建固定大小的线程池

  2. 线程池希望保持较少的线程数,并且只有在负载变得很大时才增加它

  3. 如果使用的是无界队列(如:LinkedBlockingQueue),那么线程数就不会超过 corePoolSize

keepAliveTime: 如果线程池当前的线程数多于 corePoolSize,多余的线程空闲时间超过 keepAliveTime, 它们就会被终止

线程工厂: 新的线程是由 ThreadFacotry 创建的,创建出来的线程都在同一个线程组,拥有同样的优先级

工作队列: 有3种最常见的队列类型

  1. 直接交接:SynchronousQueue

  2. 无界队列:LinkedBlockingQueue

  3. 有界队列:ArrayBlockingQueue

线程池分类

  1. FixedThreadPool

package juc.threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPoolOOM {

    private static ExecutorService executorService = Executors.newFixedThreadPool(1);

    public static void main(String[] args) {
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            executorService.execute(new SubThread());
        }
    }
}

class SubThread implements Runnable {
    @Override
    public void run() {
        try {
            Thread.sleep(1000000000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

用固定线程池存在的风险:如果使用的是无界队列,任务一直堆积在队列,导致 OOM

  1. SingleThreadExecutor

只有单个线程的线程池

package juc.threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SingleThreadExecutor {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new Task());
        }
    }
}
  1. CachedThreadPool

package juc.threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CachedThreadPool {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new Task());
        }
    }
}
  1. ScheduledThreadPool:

支持定时及周期性执行任务的线程池

package juc.threadpool;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPoolTest {
    public static void main(String[] args) {
        ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(10);
        threadPool.schedule(new Task(), 5, TimeUnit.SECONDS); // 延迟 5 秒执行
        threadPool.scheduleAtFixedRate(new Task(), 1, 3, TimeUnit.SECONDS); // 每隔 3 秒执行一次
    }
}

以上 4 种线程池的构造函数参数:

JDK 1.8 加入了 workStealingPool 线程池,适用于产生子任务的场景

综上,线程池应该选择手动创建,自动创建的线程池与我们的业务契合度不高

线程池里的线程数量

线程池里的线程数量设定为多少比较合适?

  • CPU 密集型:最佳线程数为 CPU 核心数的 1-2 倍左右

  • 耗时 IO 型:最佳线程数一般会大于 CPU 核心数的很多倍,以 JVM 线程监控显示繁忙情况为依据

估算公式:线程数 = CPU 核心数 * (1 + 平均等待时间/平均工作时间)

更精准的计算:根据不同程序做压测,得出结果

停止线程池的方法

shutdown:不是马上就停,只是发起了通知。发起通知后,有新的任务会被拒绝,存量任务会被执行完

package juc.threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ShutDown {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new ShutDownTask());
        }
        Thread.sleep(1500);
        executorService.shutdown();
        // shutdown 之后再提交的任务被拒绝了
        executorService.execute(new ShutDownTask());
    }
}

class ShutDownTask implements Runnable {
    @Override
    public void run() {
        try {
            Thread.sleep(500);
            System.out.println(Thread.currentThread().getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

isShutdown():是否发起了 shutdown

isTerminated(): 是否完全停止了

awaitTermination

executorService.shutdown();
boolean b = executorService.awaitTermination(10L, TimeUnit.SECONDS); // 10秒后是否已关闭

shutdownNow: 暴力关闭,立即执行

给正在执行的线程发送 interrupt 信号,队列中在等待的任务直接返回

package juc.threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ShutDown {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 100; i++) {
            executorService.execute(new ShutDownTask());
        }
        Thread.sleep(1500);
        executorService.shutdownNow();
    }
}

class ShutDownTask implements Runnable {
    @Override
    public void run() {
        try {
            Thread.sleep(500);
            System.out.println(Thread.currentThread().getName());
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + " 被中断了");
        }
    }
}

线程池如何拒绝新任务

线程池拒绝新任务的时机:

  1. 当 Executor 关闭时,提交新任务会被拒绝

  2. 当 Executor 对最大线程和工作队列容量使用有限边界并且已经饱和时

拒绝策略:

  • AbortPolicy

  • DiscardPolicy

  • DiscardOldestPolicy

  • CallerRunsPolicy

线程池钩子方法

钩子方法:每个任务执行前后可加一些功能

以下演示利用线程池钩子函数实现线程暂停与恢复的功能

package juc.threadpool;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 演示每个任务执行前后放钩子函数
 */
public class PauseableThreadPool extends ThreadPoolExecutor {

    private boolean isPaused;

    private final ReentrantLock lock = new ReentrantLock();
    private Condition unpaused = lock.newCondition();

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        lock.lock();
        try {
            while (isPaused) {
                unpaused.await();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void resume() {
        lock.lock();
        try {
            isPaused = false;
            unpaused.signalAll();
        } finally {
            lock.unlock();
        }
    }

    private void pause() {
        lock.lock();
        try {
            isPaused = true;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10l, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println("我被执行");
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        for (int i = 0; i < 10000; i++) {
            pauseableThreadPool.execute(runnable);
        }
        Thread.sleep(1500);
        pauseableThreadPool.pause();
        System.out.println("线程池被暂停了");
        Thread.sleep(1500);
        pauseableThreadPool.resume();
        System.out.println("线程池被恢复了");
    }

}

线程池组成部分:

  • 线程池管理器

  • 工作线程

  • 任务队列

  • 任务接口

线程池复用线程的原理:

重复调用线程的 run 方法,执行不同的任务

线程池状态

  • RUNNING 接受新任务并处理排队任务

  • SHUTDOWN 不接受新任务,但处理排队任务

  • STOP 不接受新任务,也不处理排队任务,并中断正在进行的任务

  • TIDYING 所有任务都已终止,将运行 terminate() 钩子方法

  • TERMINATED 运行完成

使用线程池的注意点:

  • 避免任务堆积

  • 避免线程数过度增加

  • 排查线程泄漏(线程一直未被回收)

关于我
一个文科出身的程序员,追求做个有趣的人,传播有价值的知识,微信公众号主要分享读书思考心得,不会有代码类文章,非程序员的同学请放心订阅
转载须注明出处:https://www.itshutong.com/articles/1016