多线程

7/11/2023 JUC

线程之间通信 加锁的目的是为了控制对共享资源的访问

# 多线程通信方式

  • volatile
  • synchronized
  • ReentrantLock/Condition
  • CountdownLatch
  • LockSupport
  • Semaphore
  • Pipeline

# 线程池

# 线程池大小设置问题

硬件资源:

  • CPU个数
  • 内存大小
  • 计算密集型/IO密集型

对于计算密集型任务,设置线程池的大小为 N(cpu) + 1能实现最优解

对于IO密集型任务,需要估算 等待时间 和 计算时间 比值 N(cpu) * U(cpu) * (1 + W/C)

N(cpu): cpu数量, U(cpu): cpu使用率, W:等待时间, C:cpu时间时间

int n_cpu = Runtime.getRuntime().availableProcessors()

  • 基本大小:没有任务执行时,线程池大小
  • 最大大小:允许的最大线程数(只有在工作队列满时才会创建超过 基本大小的 线程树)
  • 存活时间:当钱线程池超过了基本大小,并且某个线程空闲时间超过存活时间,将进行线程回收
  • 任务队列:没有线程供使用将进入队列等待
  • 饱和策略:任务队列满时执行的操作
  • 线程池工程: 自定义线程创建

CacheThreadPool 为每个任务创建一个线程,并且可以重用现有线程,无核心线程数,阻塞队列没有线程数量限制,超过60s的空闲线程将会被回收

FixedThreadPool 核心线程数和最大线程数是相同的,并且无空闲线程, 阻塞队列没有线程数量限制

ScheduledThreadPool 固定的核心线程数,最大线程数没有限制,并且无空闲线程,阻塞队列没有线程数量限制

SingleThreadPool 只有一个线程的线程池

当任务之间存在依赖时,推荐使用无界队列, 当任务之间独立, 推荐使用有界队列

# ExecutorService

该接口接管执行服务的生命周期, 线程池生命周期主要有三种状态: 运行、关闭、已终止。

  • 运行: 线程池在初始化时处于运行状态
  • 关闭: 线程池将不在接受新的任务,同时等待已经提交的任务执行完成(包括那些未开始执行的任务)
  • 已终止: 线程池已经不在运行

shutdown: 将线程设置未关闭状态 shutdownNow: 将取消所有正在运行的任务,并且不在启动队列中尚未开始执行的任务

# 常用工具类

# CompletionService

本质是将线程池和队列融合在一起; 通常我们将任务放入线程池后, 通过get方法阻塞获取。但当我们有多个任务,且任务执行的 时间长短不一,阻塞获取结果的方法效率将非常低。可以通过一个类似优先级队列东西,我们从队列中取出最先执行完成的任务结果, 进行后续操作,任务没有执行完成,就在队列中进行等待。

简单实现

public class Test {
    
   public static void main(String[] args) throws ExecutionException, InterruptedException {
      ExecutorService es = Executors.newFixedThreadPool(3);
      CompletionService<String> ecs = new ExecutorCompletionService<String>(es);

      // 模拟渲染页面
      ecs.submit(() -> {
         sleep(1500);
         return "render page";
      });
      
      // 模拟渲染10张图片
      for (int i=0; i< 10; i++) {
         ecs.submit(() -> {
            sleep(1000);
            return "render image";
         });
      }
      
      for (int i=0; i< 11; i++) {
         Future<String> poll = ecs.take();
         System.out.println(poll.get());
      }

      es.shutdown();
   }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# CompletableFuture

then开头方法末尾 如果有加 Async, 表示then方法之前的线程和then方法里面的线程不共用同一个线程池, 使用ForkingJoin线程池

  1. 任务异步回调

    • thenRun/thenRunAsync: 不关心上一个任务的执行结果,无参数,无返回
    • thenAccept/thenAcceptAsync: 依赖上一个任务的返回结果,有传参,无返回
    • thenApply/thenApplyAsync: 依赖上一个任务的返回结果,有传参,有返回
    • exceptionally: 任务执行异常时执行回调方法
    • whenComplete: 任务执行完成后的回调方法,无返回值
    • handler: 任务执行完成后执行的回调方法,有返回值
  2. 多任务组合

    AND:

    • thenCombine/thenCombineAsync: 将2个任务的执行结果作为入参,传入到指定方法中,且有返回
    • thenAcceptBoth/thenAcceptBothAsync: 将2个任务的执行结果作为入参,传入到指定方法中,无返回
    • runAfterBoth/runAfterBothAsync: 不会把执行结果当做方法入参,且没有返回

    OR:

    • applyToEither/applyToEitherAsync: 将已经完成的任务作为方法的入参,传递到指定方法中,且有返回
    • acceptEither/acceptEitherAsync: 将已经完成的任务作为方法的入参,传递到指定方法中, 但无返回
    • runAfterEither/runAfterEitherAsync: 不会把执行结果当做方法入参,且没有返回

    allOf:

    • 所有任务都执行完成后,才异步执行该方法

    anyOf:

    • 任意一个线程执行完成后, 异步执行该方法

    thenCompose:

    • 某个方法执行完后,将该任务的执行结果作为方法入参,去执行指定方法,方法会返回一个新的completableFuture对象

future 需要获取到返回值, 才能看到异常

默认线程池需要考虑生产环境是否适用

# CountDownLatch

# CyclicBarrier

# Semaphore

# synchronized

synchronized实现机制是对对象加锁, java中每个对象都关联一个监视器ObjectMonitor, 监视器地址记录在对象的 MarkWord

ObjectMonitor 大致结构如下

ObjectMonitor() {
    ...
    _count = 0; // 记录重入次数
    _owner = NULL;  // 记录持有线程
    _cxq = NULL;    // 记录锁阻塞线程 (锁竞争失败进入的阻塞队列)
    _WaitSet = NULL;    // 记录处于wait状态的线程 (同步队列)
    _EntryList = NULL;  // 记录处于锁阻塞状态的线程 (notify从同步队列移入到阻塞队列中获取锁)
    ...
}
1
2
3
4
5
6
7
8
9

_cxq 和 _EntryList 共通组成阻塞队列

synchronized关键在于两条指令 monitorentermonitorexit

monitorenter: 获取对象 monitor 所有权

  • 如果monitor_count为 0,没有线程持有锁,直接将其加一
  • 如果monitor 被线程占有,检测是否是当前线程,是当前线程加一,否则阻塞当前线程

monitorexit: 将计数器减一,如果为0,则释放锁

# interrupt

Java的中断是一种协作机制,也就是说通过中断并不能直接中断另外一个线程,而需要被中断的线程自己处理中断。

在Java的中断模型中,每个线程都有一个boolean标识,代表着是否有中断请求(该请求可以来自所有线程,包括被中断的线程本身)。 例如,当线程t1想中断线程t2,只需要在线程t1中将线程t2对象的中断标识置为true,然后线程2可以选择在合适的时候处理该中断请求。

中断方法:

  • interrupt: 设置线程的中断标志位为true
  • isInterrupted: 判断线程是否中断
  • interrupted: 返回当前线程的中断标志位,并清除中断标志

可中断的阻塞: sleep、wait、join、park等, 调用interrupt方法会抛中断异常,同时会清除中断标志位

不可中断的阻塞: 同步 Socket I/O, 同步I/O, synchronized等