Java 多线程编程之七队列、线程池、线程通信

大纲

队列

队列类族

BlockingQueue 阻塞队列是一个接口,它有七个实现类,如下所示:

  • ArrayBlockQueue:由数组结构组成的有界阻塞队列
  • LinkedBlockingQueue:由链表结构组成的有界阻塞队列,虽然是有界的,但是界限非常大(默认大小 Integer.MAX_VALUE),相当于无界,也就是可以当成无界
  • PriorityBlockQueue:支持优先级排序的无界阻塞队列
  • DelayQueue:使用优先级队列实现的延迟无界阻塞队列
  • SynchronousQueue:不存储元素的阻塞队列,即生产一个,消费一个
  • LinkedTransferQueue:由链表结构组成的无界阻塞队列
  • LinkedBlockingDeque:由链表结构组成的双向阻塞队列

提示

必须重点掌握的队列类:ArrayBlockQueue、LinkedBlockingQueue、SynchronousQueue

阻塞队列

阻塞队列的介绍

BlockingQueue 阻塞队列,排队拥堵。首先它是一个队列,而一个阻塞队列在数据结构中所起的作用大致如下图所示:

线程 1 往阻塞队列尾部的插入元素,而线程 2 从阻塞队列的头部移除元素。BlockingQueue 阻塞队列的特点如下:

  • 当阻塞队列是空时,从队列中获取元素的操作将会被阻塞
  • 当阻塞队列是满时,从队列中插入元素的操作将会被阻塞

也就是说试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其它线程往空的队列添加新的元素。同理,试图往已经满的阻塞队列中添加新元素的线程会被阻塞,直到其它线程往已满的队列中移除一个或多个元素,或者完全清空队列后,使队列重新变得空闲起来才能添加新元素。在多线程编程领域,所谓的阻塞,也就是在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动唤醒。

为什么需要 BlockingQueue?

使用 BlockingQueue 的优点是不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切都由 BlockingQueue 一手包办了。在 JUC 包发布以前,在多线程环境下,每个程序员都必须自己控制这些实现细节,尤其还要兼顾效率和线程安全,而这会给程序带来不小的复杂度。

阻塞队列的核心方法

  • 抛出异常

    • 当阻塞队列为空,在调用 remove() 方法往队列中移除元素时,会抛出 NoSuchException 异常。
    • 当阻塞队列已满,在调用 add() 方法往队列中插入元素时,会抛出 IIIegalStateException:Queue full 异常。
  • 特殊结果

    • 执行 offer() 插入方法,成功返回 true,失败返回 false。
    • 执行 poll() 移除方法,成功返回移出队列的元素,队列为空会返回 NULL。
  • 检查队列

    • 当阻塞队列为空,在调用 element() 方法检查队列时,会抛出 NoSuchElementException 异常。
    • 当阻塞队列不为空,在调用 element() 方法检查队列时,会返回队列头部的第一个元素。
    • 在调用 peek() 方法检查队列时,成功会返回队列头部的第一个元素,失败会返回 NULL。
  • 一直阻塞

    • 当阻塞队列满,生产者继续往队列里 Put 元素时,队列会一直阻塞生产线程直到成功 Put 数据或者响应中断退出。
    • 当阻塞队列空,消费者线程试图从队列里 Take 元素,队列会一直阻塞消费者线程直到队列可用(即存在至少一个队列元素)。
  • 超时恢复

    • 当阻塞队列已满,在调用 offer(E e, long timeout, TimeUnit unit) 方法往队列中插入元素时,队列会阻塞生产者线程一段时间,超过等待时间后生产者线程会恢复执行。
    • 当阻塞队列为空,在调用 poll(long timeout, TimeUnit unit) 方法往队列中移除元素时,队列会阻塞消费者线程一段时间,超过等待时间后消费者线程会恢复执行。

阻塞队列的使用案例

使用案例一
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class BlockingQueueDemo {

public static void main(String[] args) {
BlockingQueue blockingQueue = new ArrayBlockingQueue(3);

// 插入元素(会抛出异常)
System.out.println(blockingQueue.add("A"));
System.out.println(blockingQueue.add("B"));
System.out.println(blockingQueue.add("C"));

// 检查队列,队列不为空返回队列头部的第一个元素,队列为空则抛出异常
System.out.println(blockingQueue.element());

// 移除元素(会抛出异常)
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
}

}

程序运行输出的结果:

1
2
3
4
5
6
7
true
true
true
A
A
B
C
使用案例二
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class BlockingQueueDemo2 {

public static void main(String[] args) {
BlockingQueue blockingQueue = new ArrayBlockingQueue(3);

// 插入元素(不会抛出异常)
System.out.println(blockingQueue.offer("AA"));
System.out.println(blockingQueue.offer("BB"));
System.out.println(blockingQueue.offer("CC"));
System.out.println(blockingQueue.offer("DD"));

// 检查队列,成功会返回队列头部的第一个元素,队列为空会返回 NULL(不会抛出异常)
System.out.println(blockingQueue.peek());

// 移除元素(不会抛出异常)
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
}

}

程序运行输出的结果:

1
2
3
4
5
6
7
8
9
true
true
true
false
AA
AA
BB
CC
null
使用案例三

当阻塞队列为空,在调用 poll(long timeout, TimeUnit unit) 方法往队列中移除元素时,队列会阻塞消费者线程一段时间,超过等待时间后消费者线程会恢复执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class BlockingQueueDemo3 {

public static void main(String[] args) {
BlockingQueue blockingQueue = new ArrayBlockingQueue(3);
try {
blockingQueue.put("AAA");
blockingQueue.put("BBB");
blockingQueue.put("CCC");

// 移除元素
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());

// 当队列为空,往队列移除元素时,阻塞等待 5 秒,超时后往下继续执行
System.out.println(blockingQueue.poll(5, TimeUnit.SECONDS));

} catch (Exception e) {
e.printStackTrace();
}
}

}

程序运行输出的结果:

1
2
3
4
AAA
BBB
CCC
null
使用案例四

SynchronousQueue 是没有容量的队列,与其他 BlockingQueue 阻塞队列不同,SynchronousQueue 是一个不存储任何元素的 BlockingQueue。简而言之,每一个 Put 操作都必须等待一个 Take 操作完成,否者不能继续插入元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

public class BlockingQueueDemo4 {

public static void main(String[] args) {
BlockingQueue blockingQueue = new SynchronousQueue();

new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName() + " put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName() + " put 3");
blockingQueue.put("3");
} catch (Exception e) {
e.printStackTrace();
}
}, "T1").start();

new Thread(() -> {
try {
// 等待 5 秒
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + " get " + blockingQueue.take());

// 等待 5 秒
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + " get " + blockingQueue.take());

// 等待 5 秒
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + " get " + blockingQueue.take());
} catch (Exception e) {
e.printStackTrace();
}
}, "T2").start();
}

}

程序运行输出的结果:

1
2
3
4
5
6
T1 put 1
T2 get 1
T1 put 2
T2 get 2
T1 put 3
T2 get 3

从上述的运行结果可以看出,每次 T1 生产线程向阻塞队列添加元素后,T1 生产线程就会等待 T2 消费线程,T2 线程消费完之后处于挂起状态,等待 T1 生产线程再次添加元素,从而周而复始,形成一存一取的状态。

线程池

线程池的概述

线程池是一种线程管理的机制,用于提高多线程任务处理的效率和性能。它由线程池管理器、工作队列和一组工作线程组成。

  • 线程池管理器:负责创建、管理和调度线程池中的线程。它根据需要动态地创建或销毁线程,并分配任务给空闲的线程。
  • 工作队列:用于存储待执行的任务。线程池中的线程从工作队列中取出任务进行处理。当工作队列已满时,新提交的任务可能会被拒绝或者等待一段时间。
  • 工作线程:线程池中的实际执行单元。它们循环地从工作队列中取出任务执行,并在任务执行完毕后返回线程池等待下一次任务。

线程池的优点

  • 降低资源消耗:减少了线程的创建和销毁频率,降低了系统开销。
  • 提高响应速度:线程池中的线程通常是预先创建好的,可以立即处理任务,避免了线程创建的延迟。
  • 提高系统稳定性:通过控制线程的数量,避免了系统资源被耗尽的风险,防止系统因过度并发而崩溃。
  • 提高可管理性:通过线程池管理器,可以方便地监控、调整线程池的大小和任务执行情况。

线程池广泛应用于各种多线程任务处理的场景,如网络服务器、数据库连接池、图像处理等。

线程池的缺点

虽然线程池在提高多线程任务处理效率和性能方面有很多优点,但也存在一些缺点:

  • 资源占用:线程池在运行过程中会占用一定的系统资源,包括内存和 CPU 资源。如果线程池的大小设置过大,可能会消耗过多的系统资源,影响其他程序的正常运行。
  • 调优难度:确定线程池的大小和配置参数需要一定的经验和调试,不同的应用场景可能需要不同的配置,而这种调优过程可能比较繁琐。
  • 任务拥堵:如果任务提交速度过快,超过了线程池的处理能力,会导致任务在工作队列中排队等待执行,可能造成任务响应时间延长或者任务被拒绝执行。
  • 任务依赖性:线程池中的线程都是独立的执行单元,无法直接控制线程间的依赖关系。如果有一些任务之间存在依赖关系,可能需要额外的同步机制来处理。
  • 线程泄漏:如果线程池中的线程没有适时地释放,可能会导致线程资源的泄漏,进而导致系统资源的浪费或者系统稳定性的下降。

综上所述,虽然线程池可以提高多线程任务处理的效率和性能,但在使用时仍需注意合理配置线程池大小和参数,并且需要注意任务提交的速度,以免出现资源浪费或性能下降的情况。

JUC 中的线程池

线程池类族

JUC 中线程池是通过 Executor 框架实现的,该框架中用到了 Executor,ExecutorService、ThreadPoolExecutor、Executors(工具类)这几个类。

线程池的创建

在 Java 中,创建线程池一共有 5 种方式,如下所示:

  • Executors.newFixedThreadPool(int i):创建一个拥有固定线程数的线程池

    • 执行长期的任务时,性能会高很多
    • 创建一个定长的线程池,可控制线程的最大并发数,超出的线程会在队列中等待
  • Executors.newSingleThreadExecutor():创建一个只有 1 个线程的单线程池

    • 适用于一个任务一个任务顺序执行的业务场景
    • 创建一个单线程化的线程池,它只会用唯一的一个工作线程来执行任务,保证所有任务按照指定顺序执行
  • Executors.newCacheThreadPool():创建一个可扩容的线程池

    • 执行很多短期异步的小程序或者负载较轻的服务器
    • 创建一个可缓存的线程池,如果线程长度超过处理需要,可灵活回收空闲线程,如无可回收,则创建新的线程
  • Executors.newScheduledThreadPool(int corePoolSize):创建一个支持定时以及周期性执行任务的线程池

    • corePoolSize 参数表示线程池的最大线程数
    • 支持以任务调度的方式,定时以及周期性地执行任务
  • Executors.newWorkStealingPool(int parallelism):创建持有足够线程的线程池来支持给定的并行级别

    • Java 8 新增的 API,支持使用当前机器上可用的处理器作为它的并行级别,而且会使用多个队列来减少竞争
    • ExecutorService newWorkStealingPool():该方法是上面方法的简化版本,如果当前机器有 4 个 CPU,则目标并行级别被设置为 4
    • 在 Java 中,可以通过调用 Runtime.getRuntime().availableProcessors() 方法来获取 CPU 的核心数

线程池创建的 3 种常用方式

  • Executors.newFixedThreadPool:创建一个拥有固定线程数的线程池
  • Executors.newSingleThreadExecutor():创建一个只有 1 个线程的单线程池
  • Executors.newCacheThreadPool():创建一个可扩容的线程池

API 的具体使用,首先需要使用 Executors 工具类创建线程池,这里演示创建一个拥有 5 个线程的线程池

1
2
3
4
5
6
7
8
// 创建一个拥有固定线程数的线程池
ExecutorService threadPool1 = Executors.newFixedThreadPool(5);

// 创建一个只有一个线程的单线程池
ExecutorService threadPool2 = Executors.newSingleThreadExecutor();

// 创建一个拥有 N 个线程的线程池,会根据调度情况创建合适数量的线程(即可扩容)
ExecutorService threadPool3 = Executors.newCacheThreadPool();

模拟 10 个用户来办理业务,每个用户就相当于是一个来自外部请求线程,需要调用 threadPool.execute() 方法执行业务,execute() 方法需要传入一个实现了 Runnable 接口的类作为参数

1
2
3
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " 给用户办理业务");
});

线程池使用完毕后,记得必须关闭掉

1
threadPool.shutdown()

线程池的使用

使用案例一
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ThreadPoolDemo1 {

public static void main(String[] args) {
// 创建一个拥有固定线程数的线程池
ExecutorService threadPool = Executors.newFixedThreadPool(5);

try {
for (int i = 1; i <= 10; i++) {
final int index = i;
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " execute job " + index);
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}

}

程序运行输出的结果:

1
2
3
4
5
6
7
8
9
10
pool-1-thread-1 execute job 1
pool-1-thread-2 execute job 2
pool-1-thread-3 execute job 3
pool-1-thread-4 execute job 4
pool-1-thread-5 execute job 5
pool-1-thread-4 execute job 9
pool-1-thread-3 execute job 8
pool-1-thread-2 execute job 7
pool-1-thread-1 execute job 6
pool-1-thread-5 execute job 10
使用案例一
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ThreadPoolDemo2 {

public static void main(String[] args) {
// 创建一个只有 1 个线程的单线程池
ExecutorService threadPool = Executors.newSingleThreadExecutor();

try {
for (int i = 1; i <= 10; i++) {
final int index = i;
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " execute job " + index);
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}

}

程序运行输出的结果:

1
2
3
4
5
6
7
8
9
10
pool-1-thread-1 execute job 1
pool-1-thread-1 execute job 2
pool-1-thread-1 execute job 3
pool-1-thread-1 execute job 4
pool-1-thread-1 execute job 5
pool-1-thread-1 execute job 6
pool-1-thread-1 execute job 7
pool-1-thread-1 execute job 8
pool-1-thread-1 execute job 9
pool-1-thread-1 execute job 10
使用案例二
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ThreadPoolDemo3 {

public static void main(String[] args) {
// 创建一个可扩容的线程池
ExecutorService threadPool = Executors.newCachedThreadPool();

try {
Random random = new Random();
for (int i = 1; i <= 10; i++) {
final int index = i;
threadPool.execute(() -> {
// 模拟业务延迟
try {
TimeUnit.MILLISECONDS.sleep(random.nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " execute job " + index);
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}

}

程序运行输出的结果:

1
2
3
4
5
6
7
8
9
10
pool-1-thread-7 execute job 7
pool-1-thread-6 execute job 6
pool-1-thread-3 execute job 3
pool-1-thread-9 execute job 9
pool-1-thread-8 execute job 8
pool-1-thread-1 execute job 1
pool-1-thread-2 execute job 2
pool-1-thread-4 execute job 4
pool-1-thread-10 execute job 10
pool-1-thread-5 execute job 5

线程池的底层实现

查看 Executors.newFixedThreadPool()Executors.newSingleThreadExecutor() 的底层源码,能够发现都是使用了 ThreadPoolExecutor 类,还使用到了 LinkedBlockingQueue 链表阻塞队列。同时查看 Executors.newCacheThreadPool() 的底层源码,可以看到使用的是 SynchronousBlockingQueue 阻塞队列。

线程池的七大参数

1
2
3
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {

}

ThreadPoolExecutor 类是 ExecutorService 接口的一个实现,用于管理线程池。线程池在创建的时候,其构造函数包含 7 个参数,它们的含义如下:

  • corePoolSize:核心线程数,线程池中的常驻核心线程数

    • 在线程池创建后,当有请求任务过来,就会安排线程池中的线程去执行请求任务。
    • 当线程池中的线程数量超过 corePoolSize 后,就会把到达的请求任务放到工作队列中等待。
    • 在没有任务执行时,核心线程会一直存活在线程池中,即使是处于空闲状态,核心线程也不会被销毁。如果使用了无界队列,即使没有任务执行,核心线程也不会超时退出。
  • maximumPoolSize:最大线程数,线程池能够允许同时执行的最大线程数

    • 相当于扩容后的线程数,即这个线程池能容纳的最大线程数,此值必须大于等于 1。
    • 当线程池中的线程数量超过 corePoolSize,且工作队列已满时,会为新任务创建新的线程,直到达到最大线程数,超过最大线程数的任务将会被拒绝执行。
  • keepAliveTime:多余的空闲线程存活时间

    • 当线程池中的线程数量超过 corePoolSize 时,多余的空闲线程在被回收前等待新任务到来的最长时间。
    • 当空闲时间超过 keepAliveTime 时,多余的空闲线程会被销毁,直到只剩下 corePoolSize 个线程为止。
    • 在默认情况下,只有当线程池中的线程数量大于 corePoolSize 时,keepAliveTime 才会起作用。
  • unit:存活时间的单位

    • 指定 keepAliveTime 参数的时间单位,可以是秒、毫秒、分钟等。
  • workQueue:工作队列(任务队列),存放被提交但未被执行的任务

    • 线程池通过该队列来管理待执行的任务。
    • 常见的实现类包括 ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue 等。
  • threadFactory:用于创建新线程的工厂

    • 生成线程池中工作线程的线程工厂,一般用默认的即可。
    • 可以通过该参数自定义线程的创建方式,如设置线程的名称、优先级等。
  • handler:拒绝策略

    • 当任务无法被线程池执行,通常是由于线程池已经关闭,或者超过了最大线程数(maximumPoolSize),并且队列已满时,如何来拒绝执行任务的策略。

线程池最大的任务处理数

线程池最大的任务处理数 = 最大线程数(maximumPoolSize)+ 工作队列的长度

线程池的四大拒绝策略

Java 线程池提供了四大拒绝策略,用于处理无法接受新任务的情况,以下所有拒绝策略都实现了 RejectedExecutionHandler 接口。

  • AbortPolicy:这是默认拒绝策略,直接抛出 RejectedExcutionException 异常,以此通知调用者线程池无法接受新任务。
  • DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常,如果允许任务丢失,这是一种较好的方案。
  • DiscardOldestPolicy:丢弃工作队列中等待时间最长的任务(即最老的任务),然后尝试再次提交这个新任务,将其加入到工作队列中。
  • CallerRunsPolicy:既不会丢弃任务,也不会抛出异常,而是调用任务提交者的线程来执行这个新任务(即谁提交由谁来执行)。这样一来,提交任务的线程就会尝试去执行该任务,从而避免任务的丢失,并降低新任务的流量。

CallerRunsPolicy 策略详解

  • 上述提到的 CallerRunsPolicy 拒绝策略,如果线程池中的线程数达到最大线程数,工作队列也已经满了,并且任务提交者的线程也很忙,没时间去执行被拒绝的任务,那么线程池会怎么处理呢?
  • 如果采用 CallerRunsPolicy 拒绝策略,此时线程池中的线程数达到最大线程数,且工作队列也已经满了,这意味着没有空闲的线程来执行任务,并且工作队列也无法继续接收新的任务。在这种情况下,线程池会尝试调用任务提交者的线程来执行这个被拒绝的任务。然而,如果提交该任务的线程也忙于执行其他任务,没有空闲的时间去执行被拒绝的任务,那么线程池就会继续阻塞任务提交者,直到工作队列有空间或者有空闲线程可用来执行任务为止。

线程池的详细工作流程

工作流程图


工作流程详解

线程池的底层工作流程

  • (1) 在创建了线程池后,等待提交过来的任务
  • (2) 当调用 execute() 方法执行一个任务时,线程池会作出如下判断
    • a. 如果正在执行的线程数量小于 corePoolSize,那么马上创建线程执行这个任务
    • b. 如果正在执行的线程数量大于或等于 corePoolSize,那么将这个任务放入工作队列中
    • c. 如果这时候工作队列满了,并且正在执行的线程数量还小于 maximumPoolSize,那么还是创建非核心线程来执行这个任务
    • d. 如果这时候工作队列满了,并且正在执行的线程数量大于或等于 maximumPoolSize,那么线程池会启动拒绝策略来执行
  • (3) 当一个线程完成任务后,它会从工作队列中取下一个任务来执行
  • (4) 当一个线程空闲一定的时间(keepAliveTime) 后,线程池会作出如下判断
    • a. 如果当前执行的线程数大于 corePoolSize,那么这个线程就被销毁掉
    • b. 当线程池的所有任务都执行完成后,它最终会收缩到 corePoolSize 的大小

以顾客去银行办理业务为例,谈谈线程池的底层工作原理

  • (1) 最开始假设来了两个顾客,因为 corePoolSize 为 2,因此这两个顾客可以直接去窗口办理业务
  • (2) 后面又来了三个顾客,因为 corePoolSize 都已经被顾客占用了,因此只有去候客区等待,也就是在工作队列中等待
  • (3) 后面的人又陆陆续续来了,候客区可能不够用了,因此需要申请增加处理请求的临时窗口,这里的临时窗口指的是线程池中的线程数,以此来解决线程不够用的问题
  • (4) 假设受理窗口已经达到最大数 corePoolSize,但请求数还是不断增加,此时候客区和线程池都已经满了,并且新增的临时窗口也不够用,为了防止大量请求压垮线程池,需要开启拒绝策略
  • (5) 临时增加的线程会因为超过了最大存活时间,就会被销毁,最后线程数量从最大数削减到核心数

如何合理配置线程池的参数

面试题:生产环境中如何配置线程池的 corePoolSize 和 maximumPoolSize 参数?

首先需要弄清楚一点,业务系统是属于 CPU 密集型还是 I/O 密集型的。因为线程池的配置,是需要根据具体不同的业务来配置。

  • CPU 密集型

    • CPU 密集的意思是,该任务需要大量的运算,而且没有阻塞,CPU 一直全速运行
    • CPU 密集任务只有在真正的多核 CPU 上才可能得到加速执行(使用多线程),而在单核 CPU 上,无论启动几个模拟的多线程,该任务都不可能得到加速执行
    • CPU 密集型的任务,应该尽可能少的配置线程数量
    • 一般配置公式:CPU 核心数 + 1 个线程数
  • I/O 密集型

    • IO 密集型的意思是,该任务需要大量的 I/O 操作(网络、磁盘等),大部分线程都会被阻塞
    • 由于 I/O 密集型的任务线程并不是一直在执行任务,因此需要尽可能多配置线程数,如 CPU 核心数 * 2
    • 在单线程上执行 I/O 密集型的任务,会浪费大量的 CPU 运算能力,即花费大量时间在阻塞等待上;所以在 I/O 密集型任务中,使用多线程可以大大的加速程序的运行,这种加速主要就是利用了被浪费掉的阻塞时间
    • 参考配置公式:CPU 核心数 / (1 - 阻塞系数),阻塞系数在 0.8 ~ 0.9 左右

阅读扩展

为什么不用默认创建的线程池

重点面试题解析

面试题:线程池常用的创建方式有三种:固定数量的、单一线程的、可扩容的,那么在实际开发中,应该使用哪种方式?

答案是在生产环境中三种方式都不用,而是使用自己自定义的。那为什么不用 JDK 中 Executors 提供的呢?在阿里巴巴的 Java 开发手册中,有以下的线程池使用要求:

1
2
3
4
5
6
7
8
9
10
- 线程资源必须通过线程池提供,不允许在应用中自行显式创建线程
- 使用线程池的好处是可以减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题
- 如果不使用线程池,有可能造成系统创建大量同类线程而导致内存消耗尽,或者导致 "过度切换" 的问题

- 线程池不允许使用 Executors 去创建,而是应用通过 ThreadPoolExecutor 的方式手动创建,这样的处理方式可以让开发者更加明确线程池的运行规则,规避资源耗尽的风险
- 使用 Executors 创建线程池的弊端如下
- FixedThreadPool 和 SingleThreadPool
- 允许的工作队列长度为 Integer.MAX_VALUE,可能会堆积大量的任务,从而导致 OOM
- CacheThreadPool 和 ScheduledThreadPool
- 允许创建的线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM

这里总结了一些不推荐使用默认 Executors 创建线程池的原因:

  • 无界队列:默认情况下,Executors 工厂方法创建的线程池使用的是无界队列。这意味着如果任务提交的速度超过了线程池处理任务的速度,那么队列会不断增长,最终可能导致内存耗尽或者 OutOfMemoryError。因此,在某些情况下,使用有界队列更安全,当工作队列已满时,可以根据需要采取适当的拒绝策略。

  • 线程生命周期管理:使用默认的 Executors 创建的线程池,线程的生命周期(如线程的创建、销毁等)可能由线程池自动管理,而这种自动管理可能不适合所有的应用场景。例如,在某些情况下,可能需要对线程的创建和销毁进行更精细的控制,以避免资源泄露或者其他问题。

  • 线程池的大小:默认的 Executors 创建的线程池大小可能不符合实际需求。例如,如果任务量非常大,但是线程池的大小较小,那么可能会导致任务排队等待执行,从而影响系统的性能。因此,在实际应用中,通常需要根据实际情况来调整线程池的大小。

手写线程池的实现

因为 Executors 创建的部分线程池,底层是使用 LinkBlockingQueue 作为阻塞队列的;而 LinkBlockingQueue 虽然是有界的,但它的上限是 Integer.MAX_VALUE,大概有 20 多亿之大,可以视为是无界了。这意味着可能会堆积大量的任务,从而导致 OOM。因此需要使用 ThreadPoolExecutor 手动创建线程池,然后指定阻塞队列的大小。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 创建一个核心线程数为 2,最大线程数为 5,并且工作队列大小为 3 的线程池
*/
public class ThreadPoolDemo4 {

public static void main(String[] args) {
final int corePoolSize = 2;
final int maximumPoolSize = 5;
final int queueSize = 3;
final long keepAliveTime = 10L;

ExecutorService executorService = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(queueSize),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());

try {
for (int i = 1; i <= 10; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + " execute");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
}

}

线程通信

线程通信之生产者消费者模式

同步版的实现

面试题目:一个初始值为零的变量,两个线程对其交替操作,一个线程加 1,一个线程减 1,重复 5 轮操作。

这里将给出基于 synchronizedwait()notifyAll() 实现线程间通信的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class ShareData1 {

private int number = 0;

public synchronized void increment() throws Exception {
try {
// 使用 while 循环,而不是使用 if 判断,否则可能会导致虚假唤醒的现象
while (number != 0) {
// 等待,不能往下执行
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName() + " " + number);
// 通知唤醒
this.notifyAll();
} catch (Exception e) {
e.printStackTrace();
}
}

public synchronized void decrement() throws Exception {
try {
// 使用 while 循环,而不是使用 if 判断,否则可能会导致虚假唤醒的现象
while (number == 0) {
// 等待,不能往下执行
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName() + " " + number);
// 通知唤醒
this.notifyAll();
} catch (Exception e) {
e.printStackTrace();
}
}

}

public class ProdConsumerDemo1 {

public static void main(String[] args) {
ShareData1 shareData = new ShareData1();

// 加 1
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
shareData.increment();
} catch (Exception exception) {
exception.printStackTrace();
}
}
}, "T1").start();

// 减 1
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
shareData.decrement();
} catch (Exception exception) {
exception.printStackTrace();
}
}
}, "T2").start();
}

}

程序运行输出的结果:

1
2
3
4
5
6
7
8
9
10
T1 1
T2 0
T1 1
T2 0
T1 1
T2 0
T1 1
T2 0
T1 1
T2 0

可重入锁版的实现

面试题目:一个初始值为零的变量,两个线程对其交替操作,一个线程加 1,一个线程减 1,重复 5 轮操作。

这里将给出基于 Lock、await()signalAll() 实现线程间通信的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class ShareData2 {

private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();

public void increment() throws Exception {
lock.lock();
try {
// 使用 while 循环,而不是使用 if 判断,否则可能会导致虚假唤醒的现象
while (number != 0) {
// 等待,不能往下执行
condition.await();
}
number++;
System.out.println(Thread.currentThread().getName() + " " + number);
// 通知唤醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void decrement() throws Exception {
lock.lock();
try {
// 使用 while 循环,而不是使用 if 判断,否则可能会导致虚假唤醒的现象
while (number == 0) {
// 等待,不能往下执行
condition.await();
}
number--;
System.out.println(Thread.currentThread().getName() + " " + number);
// 通知唤醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

}

public class ProdConsumerDemo2 {

public static void main(String[] args) {
ShareData2 shareData = new ShareData2();

// 加 1
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
shareData.increment();
} catch (Exception exception) {
exception.printStackTrace();
}
}
}, "T1").start();

// 减 1
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
shareData.decrement();
} catch (Exception exception) {
exception.printStackTrace();
}
}
}, "T2").start();
}

}

程序运行输出的结果:

1
2
3
4
5
6
7
8
9
10
T1 1
T2 0
T1 1
T2 0
T1 1
T2 0
T1 1
T2 0
T1 1
T2 0

阻塞队列版的实现

面试题:一个初始值为零的变量,两个线程对其交替操作,一个加 1,一个减 1,重复 5 轮

这里将给出基于 volatile、CAS、AtomicInteger、BlockQueue 实现线程间通信的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

class ShareData3 {

// 使用 volatile 保证可见性
private volatile boolean FLAG = true;
// 使用原子包装类
private AtomicInteger atomicInteger = new AtomicInteger(0);
// 阻塞队列
private BlockingQueue<String> blockingQueue = null;

public ShareData3(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
}

public void stop() {
FLAG = false;
}

public void produce() throws Exception {
String data = null;
boolean result = false;
// 使用 while 循环,而不是使用 if 判断,否则可能会导致虚假唤醒的现象
while (FLAG) {
data = atomicInteger.incrementAndGet() + "";
// 生产
result = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
if (result) {
System.out.println(Thread.currentThread().getName() + " produce success : " + data);
} else {
System.out.println(Thread.currentThread().getName() + " produce failed");
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println("produce stop");
}

public void consume() throws Exception {
String result = null;
// 使用 while 循环,而不是使用 if 判断,否则可能会导致虚假唤醒的现象
while (FLAG) {
// 消费,如果队列为空,当前线程会阻塞等待 2 秒
result = blockingQueue.poll(2L, TimeUnit.SECONDS);
if (null == result) {
FLAG = false;
System.out.println("consume timeout, exit");
return;
}
System.out.println(Thread.currentThread().getName() + " consume success : " + result);
}
System.out.println("consume stop");
}

}

public class ProdConsumerDemo3 {

public static void main(String[] args) {
ShareData3 shareData = new ShareData3(new ArrayBlockingQueue<>(10));

// 启动生产者线程
new Thread(() -> {
try {
shareData.produce();
} catch (Exception exception) {
exception.printStackTrace();
}
}, "Producer thread").start();

// 启动消费者线程
new Thread(() -> {
try {
shareData.consume();
} catch (Exception exception) {
exception.printStackTrace();
}
}, "Consumer thread").start();

// 等待 5 秒后,生产和消费线程停止,主线程结束
try {
TimeUnit.SECONDS.sleep(5);
shareData.stop();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

程序运行输出的结果:

1
2
3
4
5
6
7
8
9
10
11
12
Producer thread produce success : 1
Consumer thread consume success : 1
Producer thread produce success : 2
Consumer thread consume success : 2
Producer thread produce success : 3
Consumer thread consume success : 3
Producer thread produce success : 4
Consumer thread consume success : 4
Producer thread produce success : 5
Consumer thread consume success : 5
produce stop
consume timeout, exit

线程通信之按顺序执行多个线程

面试题目:多个线程之间按顺序调用,三个线程的调用顺序是 A -> B -> C ,具体要求如下:

1
2
3
4
A 打印 5 次,B 打印 10 次,C 打印 15 次
A 打印 5 次,B 打印 10 次,C 打印 15 次
......
重复 10 轮

这样的场景使用 synchronized 来完成的话,会非常的困难,但是使用 Lock 就非常方便了,也就是需要实现一个链式唤醒的操作。这里将给出利用锁绑定多个条件(Condition)来实现线程间通信的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class ShareData4 {

private int number = 1;

// 创建一个可重入锁
private Lock lock = new ReentrantLock();

// 创建三个条件,相当于三把备用钥匙
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();

public void print5() {
lock.lock();
try {
// 判断
while (number != 1) {
condition1.await();
}
// 干活
for (int i = 1; i <= 5; i++) {
System.out.println(Thread.currentThread().getName() + " " + i);
}
// 通知
number = 2;
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void print10() {
lock.lock();
try {
// 判断
while (number != 2) {
condition2.await();
}
// 干活
for (int i = 1; i <= 10; i++) {
System.out.println(Thread.currentThread().getName() + " " + i);
}
// 通知
number = 3;
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void print15() {
lock.lock();
try {
// 判断
while (number != 3) {
condition3.await();
}
// 干活
for (int i = 1; i <= 15; i++) {
System.out.println(Thread.currentThread().getName() + " " + i);
}
// 通知
number = 1;
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

}

public class ProdConsumerDemo4 {

public static void main(String[] args) {
ShareData4 shareData = new ShareData4();

new Thread(() -> {
for (int i = 0; i < 10; i++) {
shareData.print5();
}
}, "A").start();

new Thread(() -> {
for (int i = 0; i < 10; i++) {
shareData.print10();
}
}, "B").start();

new Thread(() -> {
for (int i = 0; i < 10; i++) {
shareData.print15();
}
}, "C").start();
}

}

程序运行输出的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
A 1
A 2
A 3
A 4
A 5
B 1
B 2
B 3
B 4
B 5
B 6
B 7
B 8
B 9
B 10
C 1
C 2
C 3
C 4
C 5
C 6
C 7
C 8
C 9
C 10
C 11
C 12
C 13
C 14
C 15
......

Callable 接口

创建线程有三种常见的实现方式:

  • 继承 Thread 类,并重写 run() 方法,其返回值是 void
  • 实现 Runnable 接口,并实现 run() 方法,其返回值是 void
  • 实现 Callable 接口,并实现 call() 方法,可以指定其返回值和抛出异常

Callable 接口的介绍

Callable 接口可以让线程执行完成后,返回执行结果。Callable 接口适用于批处理业务的场景,比如转账的时候,需要返回转账结果的状态码,记录本次操作是成功还是失败。

1
2
3
4
5
6
7
8
class MyCallable implements Callable<Integer> {

@Override
public Integer call() throws Exception {
return 1024;
}

}

然后将实现 Callable 接口的 MyCallable 类包装起来。这里需要用到的是 FutureTask 类,它实现了 Runnable 和 Future 接口,并且还需要传递一个实现 Callable 接口的实现类作为构造函数参数。

1
FutureTask<Integer> futureTask = new FutureTask<>(new MyCallable());

然后使用 Thread 类进行实例化,传入实现 Runnabnle 接口的 FutureTask 类

1
2
Thread t1 = new Thread(futureTask, "A");
t1.start();

最后通过 FutureTask.get() 获取到返回值,这里的 get() 方法会阻塞线程的执行,直到 Callable 的任务执行完毕

1
Integer result = futureTask.get();

特别注意的是,当要获得 Callable 的计算结果时,如果任务线程没有计算完成就要去等待,也就是会导致线程阻塞,直到计算完成为止。简而言之,调用 FutureTask.get() 方法会阻塞线程的执行,直到 Callable 的任务执行完毕,因此往往会将 FutureTask.get() 方法写在最后面,这样就不会阻塞主线程。值得一提的是,也可以使用下面的算法,使用类似于自旋锁的方式来判断任务是否执行完毕。

1
2
3
while(!futureTask.isDone()) {
sleep(100);
}

Callable 接口的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

class MyCallable implements Callable<Integer> {

@Override
public Integer call() throws Exception {
return 1024;
}

}

public class CallableDemo {

public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建 Callable 实例
MyCallable callable = new MyCallable();
// 用 FutureTask 类来包装 Callable 实例
FutureTask<Integer> futureTask = new FutureTask<>(callable);
// 创建线程,执行 FutureTask
new Thread(futureTask, "T1").start();
// 会阻塞等待任务完成((往往会写在最后面))
Integer result = futureTask.get();
System.out.println(result);
}

}

程序运行输出的结果:

1
1024

Callable 接口的注意事项

多个线程执行同一个 FutureTask 的时候,最终只会执行一次任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

class MyCallable implements Callable<Integer> {

@Override
public Integer call() throws Exception {
System.out.println(Thread.currentThread().getName() + " invoke call()");
return 1024;
}

}

public class CallableDemo {

public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建 Callable 实例
MyCallable callable = new MyCallable();

// 用 FutureTask 类来包装 Callable 实例
FutureTask<Integer> futureTask = new FutureTask<>(callable);

// 创建多个线程,执行同一个 FutureTask
new Thread(futureTask, "T1").start();
new Thread(futureTask, "T2").start();

// 会阻塞等待任务完成((往往会写在最后面))
Integer result = futureTask.get();
System.out.println(result);
}

}

程序运行输出的结果:

1
2
T1 invoke call()
1024

如果需要多个线程执行多次任务,那么就使用多个不同的 FutureTask。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

class MyCallable implements Callable<Integer> {

@Override
public Integer call() throws Exception {
System.out.println(Thread.currentThread().getName() + " invoke call()");
return 1024;
}

}

public class CallableDemo {

public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建 Callable 实例
MyCallable callable = new MyCallable();

// 定义多个 FutureTask 类,用来包装 Callable 实例
FutureTask<Integer> futureTask1 = new FutureTask<>(callable);
FutureTask<Integer> futureTask2 = new FutureTask<>(callable);

// 创建多个线程,执行多个不同的 FutureTask
new Thread(futureTask1, "T1").start();
new Thread(futureTask2, "T2").start();

// 会阻塞等待任务完成((往往会写在最后面))
Integer result1 = futureTask1.get();
Integer result2 = futureTask2.get();
System.out.println(result1);
System.out.println(result2);

}

}

程序运行输出的结果:

1
2
3
4
T1 invoke call()
T2 invoke call()
1024
1024