多线程详解
Ⅰ、进程和线程
一、进程和线程的区别?
简单的比喻:进程=火车,线程=车厢
- 线程在进程下行进(单纯的车厢无法运行)
- 一个进程可以包含多个线程(一辆火车可以有多个车厢)
- 不同进程间数据很难共享(一辆火车上的乘客很难换到另外一辆火车,比如站点换乘)
- 同一进程下不同线程间数据很易共享(A车厢换到B车厢很容易)
- 进程要比线程消耗更多的计算机资源(采用多列火车相比多个车厢更耗资源)
- 进程间不会相互影响,一个线程挂掉将导致整个进程挂掉(一列火车不会影响到另外一列火车,但是如果一列火车上中间的一节车厢着火了,将影响到所有车厢)
- 进程可以拓展到多机,进程最多适合多核(不同火车可以开在多个轨道上,同一火车的车厢不能在行进的不同的轨道上)
- 进程使用的内存地址可以上锁,即一个线程使用某些共享内存时,其他线程必须等它结束,才能使用这一块内存。(比如火车上的洗手间)-"互斥锁"
- 进程使用的内存地址可以限定使用量(比如火车上的餐厅,最多只允许多少人进入,如果满了需要在门口等,等有人出来了才能进去)-“信号量”
进一步总结:
- 进程是一个资源的容器,为进程里的所有线程提供共享资源,是对程序的一种静态描述
- 线程是计算机最小的调度和运行单位,是对程序的一种动态描述
例如:
开个QQ,开了一个进程;开了迅雷,开了一个进程。 在QQ的这个进程里,传输文字开一个线程、传输语音开了一个线程、弹出对话框又开了一个线程。
所以运行某个软件,相当于开了一个进程。在这个软件运行的过程里(在这个进程里),多个工作支撑的完成QQ的运行,那么这“多个工作”分别有一个线程。
所以一个进程管着多个线程。
1、进程
程序由指令和数据组成,但这些指令要运行,数据要读写,就必须将指令加载至 CPU,数据加载至内存。在 指令运行过程中还需要用到磁盘、网络等设备。进程就是用来加载指令、管理内存、管理 IO 的
当一个程序被运行,从磁盘加载这个程序的代码至内存,这时就开启了一个进程。
进程就可以视为程序的一个实例。大部分程序可以同时运行多个实例进程(例如记事本、画图、浏览器 等),也有的程序只能启动一个实例进程(例如网易云音乐、360 安全卫士等)
2、线程
一个进程之内可以分为一到多个线程。
一个线程就是一个指令流,将指令流中的一条条指令以一定的顺序交给 CPU 执行
Java 中,线程作为小调度单位,进程作为资源分配的小单位。 在 windows 中进程是不活动的,只是作 为线程的容器
3、进程和线程的区别
进程是资源分配的最小单位,线程是CPU调度的最小单位
进程基本上相互独立的,而线程存在于进程内,是进程的一个子集
进程拥有共享的资源,如内存空间等,供其内部的线程共享
进程间通信较为复杂
- 同一台计算机的进程通信称为 IPC(Inter-process communication)
- 不同计算机之间的进程通信,需要通过网络,并遵守共同的协议,例如 HTTP
线程通信相对简单,因为它们共享进程内的内存,一个例子是多个线程可以访问同一个共享变量
线程更轻量,线程上下文切换成本一般上要比进程上下文切换低
Ⅱ、Java线程
一、创建线程的方式
1、继承Thread
只有调用**start()**方法才算开启一个线程。否则只是代表创建一个Thread实例。
public class MyThread extends Thread{
@Override
public void run() {
for (int i = 0; i < 10; i++) {
// Thread.currentThread().getName()获取当前线程名称
System.out.println(Thread.currentThread().getName()+":"+i);
}
}
}
public class T1 {
public static void main(String[] args) {
// ①继承Thread类 重写run方法
MyThread myThread=new MyThread();
// 设置线程的名称
myThread.setName("myThread");
myThread.start();
// ②使用匿名内部类继承Thread
Thread thread1=new Thread("thread1"){
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName()+":"+i);
}
}
};
thread1.start();
}
}
2、实现Runnable接口
public class MyRunnable implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName()+":"+i);
}
}
}
public class T1 {
public static void main(String[] args) {
// ①传入Runnable接口的实现类
Thread thread=new Thread(new MyRunnable(), "myRunnable");
thread.start();
// ②使用匿名内部类实现Runnable接口
Thread thread1=new Thread(new Runnable(){
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName()+":"+i);
}
}
},"myRunnable2");
thread1.start();
}
}
3、实现Callable接口
Callable和Runnable的区别是,Callable接口可以拿到线程处理后的返回值给其他线程使用
public class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName());
return 100;
}
public static void main(String[] args) {
// 新建任务
// ①通过实现Callable接口
FutureTask<Integer> task=new FutureTask<>(new MyCallable());
// ②使用匿名内部类实现Callable实现接口
FutureTask<String> task1=new FutureTask<>(()->{
System.out.println(Thread.currentThread().getName());
return "hello java";
});
// FutureTask也是实现Runnable接口的
Thread thread=new Thread(task, "MyCallable");
Thread thread1=new Thread(task1,"MyCallable2");
// 开启此线程
thread.start();
thread1.start();
// 主线程获取返回值
try {
// 注意:主线程会一直阻塞在这里,直到get拿到此返回值
Integer t1 = task.get();
String t2 = task1.get();
System.out.println(t1);
System.out.println(t2);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
4、Thread和Runnable的关系
Thread 是把线程和任务合并在了一起
Runnable是把线程和任务分开了
用 Runnable 更容易与线程池等高级 API 配合
用 Runnable 让任务类脱离了 Thread 继承体系,更灵活
三、原理之线程运行
1、栈与栈帧
栈先进后出
Java Virtual Machine Stacks (Java 虚拟机栈)
我们都知道 JVM 中由堆、栈、方法区所组成,其中栈内存是给谁用的呢?其实就是线程,每个线程启动后,虚拟 机就会为其分配一块栈内存。
每个线程都有自己的一个独立的栈帧,栈帧之间互不影响。
每个栈由多个栈帧(Frame)组成,对应着每次方法调用时所占用的内存
每个线程只能有一个活动栈帧,对应着当前正在执行的那个方法
idea中每个方法执行的栈帧,我们可以看到main方法首先入栈,然后method1和method2相继压栈,最后mehtod2和mehtod1执行完后,从上到下出栈
图解分析:方法执行在jvm中内存区域分析:
方法相继出栈:
2、线程上下文切换
因为以下一些原因导致 cpu 不再执行当前的线程,转而执行另一个线程的代码
线程的 cpu 时间片用完
垃圾回收
有更高优先级的线程需要运行
线程自己调用了 sleep、yield、wait、join、park、synchronized、lock 等方法
当 Context Switch 发生时,需要由操作系统保存当前线程的状态,并恢复另一个线程的状态,Java 中对应的概念 就是程序计数器(Program Counter Register),它的作用是记住下一条 jvm 指令的执行地址,是线程私有的
- 状态包括程序计数器、虚拟机栈中每个栈帧的信息,如局部变量、操作数栈、返回地址等
- Context Switch 频繁发生会影响性能
四、Thread常用方法
方法名 | static | 功能说明 | 注意 |
---|---|---|---|
start() | 启动一个新线 程,在新的线程 运行 run 方法 中的代码 | start 方法只是让线程进入就绪,里面代码不一定立刻 运行(CPU 的时间片还没分给它)。每个线程对象的 start方法只能调用一次,如果调用了多次会出现 IllegalThreadStateException | |
run() | 新线程启动后会 调用的方法 | 如果在构造 Thread 对象时传递了 Runnable 参数,则 线程启动后会调用 Runnable 中的 run 方法,否则默 认不执行任何操作。但可以创建 Thread 的子类对象, 来覆盖默认行为 | |
join() | 等待线程运行结 束 | ||
join(long n) | 等待线程运行结 束,多等待 n 毫秒 | ||
getId() | 获取线程长整型 的 id | id 唯一 | |
getName() | 获取线程名 | ||
setName(String name) | 修改线程名 | ||
getPriority() | 获取线程优先级 | ||
setPriority(int) | 修改线程优先级 | java中规定线程优先级是1~10 的整数,较大的优先级 能提高该线程被 CPU 调度的机率 | |
getState() | 获取线程状态 | Java 中线程状态是用 6 个 enum 表示,分别为: NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED | |
isInterrupted() | 判断是否被打 断 | 不会清除 打断标记 | |
isAlive() | 线程是否存活 (还没有运行完 毕) | ||
interrupt() | 打断线程 | 如果被打断线程正在 sleep,wait,join 会导致被打断 的线程抛出 InterruptedException,并清除 打断标记 ;如果打断的正在运行的线程,则会设置 打断标记 ;park 的线程被打断,也会设置 打断标记 | |
interrupted() | static | 判断当前线程是 否被打断 | 会清除 打断标记 |
currentThread() | static | 获取当前正在执 行的线程 | |
sleep(long n) | static | 让当前执行的线 程休眠n毫秒, 休眠时让出 cpu 的时间片给其它 线程 | |
yield() | static | 提示线程调度器 让出当前线程对 CPU的使用 | 主要是为了测试和调试 |
五、sleep与yield
1、sleep
①调用 sleep 会让当前线程从 Running 进入 Timed Waiting 状态(阻塞)
@Slf4j
public class T1 {
public static void main(String[] args) {
Thread t1=new Thread(()->{
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 新建的线程t1处于new状态
// NEW
log.info("{}",t1.getState());
t1.start();
// 调用start方法后线程t1处于可运行状态
// RUNNABLE
log.info("{}",t1.getState());
try {
// 主线程休眠1秒
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 此时主线程执行到这的时候,t1线程处于休眠状态
// 即此时t1处于超时等待
// TIMED_WAITING
log.info("{}",t1.getState());
}
}
②其它线程可以使用 interrupt 方法打断正在睡眠的线程,这时 sleep 方法会抛出 InterruptedException
@Slf4j
public class T2 {
public static void main(String[] args) throws InterruptedException {
Thread t1=new Thread(()->{
try {
log.info("start sleep....");
Thread.sleep(3000);
} catch (InterruptedException e) {
log.info("i wake up");
e.printStackTrace();
}
});
t1.start();
Thread.sleep(1000);
// 打断正在休眠的进程 抛出InterruptedException异常
t1.interrupt();
}
}
17:02:56.329 [Thread-0] INFO org.lc.sleep_yield.T2 - start sleep....
17:02:57.325 [Thread-0] INFO org.lc.sleep_yield.T2 - i wake up
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at org.lc.sleep_yield.T2.lambda$main$0(T2.java:18)
at java.lang.Thread.run(Thread.java:748)
③睡眠结束后的线程未必会立刻得到执行
睡眠过后,还是要等cpu来分配时间片给该线程来执行
④建议用 TimeUnit 的 sleep 代替 Thread 的 sleep 来获得更好的可读性
TimeUnit.DAYS
TimeUnit.HOURS
TimeUnit.MINUTES
TimeUnit.SECONDS
@Slf4j
public class T3 {
public static void main(String[] args) {
log.info("start sleep");
try {
// 格式化睡眠时间 1 秒
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("end sleep");
}
}
2、yield
Thread.yield();
①调用 yield 会让当前线程从 Running 进入 Runnable 就绪状态,然后调度执行其它线程
②具体的实现依赖于操作系统的任务调度器
public class T4 {
public static void main(String[] args) {
Thread t1=new Thread(()->{
int count = 0;
for (; ; ) {
// 设置yield() 让出cpu
Thread.yield();
System.out.println("t1----:"+count++);
}
},"t1");
Thread t2=new Thread(()->{
int count = 0;
for (; ; ) {
System.out.println("t2----:"+count++);
}
},"t2");
t1.start();
t2.start();
}
}
......
t1----:6577
t1----:6578
t1----:6579
t2----:23524
t2----:23525
t2----:23526
....
我们明显可以发现,yield是有一定作用来让出cpu时间片给其他线程来执行的
3、sleep与yield的区别
当运行中的线程调用yield方法时,会直接进入从运行状态进入就绪状态,然后等待cpu时间片的调用。
当运行中的线程调用sleep方法时,线程会进入一个timed waiting阻塞状态,只有该sleep时间片过了,才有机会让cpu调用此线程
4、线程优先级
线程优先级会提示(hint)调度器优先调度该线程,但它仅仅是一个提示,调度器可以忽略它
如果 cpu 比较忙,那么优先级高的线程会获得更多的时间片,但 cpu 闲时,优先级几乎没作用
setPriority(int newPriority)
范围:1-10
/**
* 线程最小的优先级.
*/
public final static int MIN_PRIORITY = 1;
/**
* 线程默认的优先级.
*/
public final static int NORM_PRIORITY = 5;
/**
* 线程最大的优先级
*/
public final static int MAX_PRIORITY = 10;
public class T4 {
public static void main(String[] args) {
Thread t1=new Thread(()->{
int count = 0;
for (; ; ) {
System.out.println("t1----:"+count++);
}
},"t1");
Thread t2=new Thread(()->{
int count = 0;
for (; ; ) {
System.out.println("t2----:"+count++);
}
},"t2");
// 设置t1线程最大的优先级
t1.setPriority(Thread.MAX_PRIORITY);
// 设置t2线程最小的优先级
t2.setPriority(Thread.MIN_PRIORITY);
t1.start();
t2.start();
}
}
...
t1----:34178
t1----:34179
t2----:28259
t2----:28260
...
我们可以发现优先级高的线程比优先级高的有明显
5、设置线程优先级和yield的区别
都是可以尽量让出cpu时间片给其他线程执行,但是还是取决于cpu的执行能力和cpu是否闲忙的程度
6、sleep应用:使用sleep限制对cpu的占用
在没有利用 cpu 来计算时,不要让 while(true) 空转浪费 cpu,这时可以使用 yield 或 sleep 来让出 cpu 的使用权 给其他程序
可以用 wait 或 条件变量达到类似的效果
不同的是,后两种都需要加锁,并且需要相应的唤醒操作,一般适用于要进行同步的场景
sleep 适用于无需锁同步的场景
在单核cpu情况下,如果我们运行一下代码:
public class T5 {
public static void main(String[] args) {
new Thread(()->{
while (true) {
System.out.println("hello world! ");
}
}).start();
}
}
我们会发现cpu会出现占满的情况。
修改以上代码:
public class T5 {
public static void main(String[] args) {
new Thread(()->{
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("hello world! ");
}
}).start();
}
}
我们可以发现加上sleep后cpu占用率明显下降。
六、join的使用
join会阻塞当前正在运行的代码。直到该join对应的线程执行完毕。 join底层其实就是wait
1、基本应用
@Slf4j
public class T6 {
static int a=0;
public static void main(String[] args) throws InterruptedException {
log.info("开始");
Thread t1 = new Thread(() -> {
try {
log.info("开始");
Thread.sleep(1000);
log.info("结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
a=10;
}, "t1");
t1.start();
// 主线程等待该线程结束 然后才能执行后面的代码 会阻塞当前主线程
t1.join();
log.info("结果为:{}",a);
log.info("结束");
}
}
10:14:00.197 [main] INFO org.lc.sleep_yield.T6 - 开始
10:14:00.240 [t1] INFO org.lc.sleep_yield.T6 - 开始
10:14:01.240 [t1] INFO org.lc.sleep_yield.T6 - 结束
10:14:01.240 [main] INFO org.lc.sleep_yield.T6 - 结果为:10
10:14:01.241 [main] INFO org.lc.sleep_yield.T6 - 结束
2、同步应用-等待多个线程结果
以调用方角度来讲,如果
- 需要等待结果返回,才能继续运行就是同步
- 不需要等待结果返回,就能继续运行就是异步
@Slf4j
public class T7 {
static int a=0;
static int b=0;
public static void main(String[] args) throws InterruptedException {
test();
}
private static void test() throws InterruptedException {
Thread t1=new Thread(()->{
try {
Thread.sleep(1000);
a=10;
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t1");
Thread t2=new Thread(()->{
try {
Thread.sleep(2000);
b=20;
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t2");
long start = System.currentTimeMillis();
t1.start();
t2.start();
t1.join();
t2.join();
long end = System.currentTimeMillis();
log.info("a:{} b:{} 花费时间:{}",a,b,end-start);
}
}
10:25:45.948 [main] INFO org.lc.sleep_yield.T7 - a:10 b:20 花费时间:2000
我们可以发现,为啥不是3秒呢?
因为t1线程在跑的同时,t2线程也在跑。当等待t1线程跑完时,经过了1秒,到t2线程时,t2已经跑了1秒,所以现在只用等待1s即可,所以总共为2s
3、限时同步
join(long n) 等待线程执行毫秒
@Slf4j
public class T7 {
static int a=0;
public static void main(String[] args) throws InterruptedException {
test1();
}
private static void test1() throws InterruptedException {
Thread t1=new Thread(()->{
try {
Thread.sleep(3000);
a=10;
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t1");
t1.start();
t1.join(1500);
log.info("{}",a);
}
}
10:41:59.091 [main] INFO org.lc.sleep_yield.T7 - 0
- 我们发现当join设置的小于t1线程的执行时间时,t1线程还没有执行完,主线程即不再等待,将执行join后面的代码
@Slf4j
public class T7 {
static int a=0;
public static void main(String[] args) throws InterruptedException {
test1();
}
private static void test1() throws InterruptedException {
Thread t1=new Thread(()->{
try {
Thread.sleep(2000);
a=10;
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t1");
long s=System.currentTimeMillis();
t1.start();
t1.join(4000);
long e=System.currentTimeMillis();
log.info("{}, 时间:{}",a,e-s);
}
}
10:46:03.790 [main] INFO org.lc.sleep_yield.T7 - 10, 时间:2000
- 我们发现当设置的join时间大于t1线程的执行时间,则t1线程会提前终止join。
七、interrupt使用
注意:
interrupt()
使线程中断 。实例方法。isInterrupted()
判断线程是否被打断,不会清除标记 。该方法为实例方法interrupted()
判断线程是否被打断,会清除标记。该方法为静态方法
1、打断 sleep,wait,join 的阻塞线程
打断sleep,wait,join时的阻塞线程,会清除interrupt标记,即isInterrupted()为false
这几个方法都会让线程进入阻塞状态 打断 sleep 的线程, 会清空打断状态,以 sleep 为例(打断正在睡眠的线程):
@Slf4j
public class T1 {
public static void main(String[] args) throws InterruptedException {
Thread t1=new Thread(()->{
try {
log.info("开始睡眠");
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t1.start();
Thread.sleep(1000);
log.info("interrupt");
t1.interrupt();
log.info("打断状态:{}",t1.isInterrupted());
}
}
11:38:13.518 [Thread-0] INFO org.lc.interrupt.T1 - 开始睡眠
11:38:14.518 [main] INFO org.lc.interrupt.T1 - interrupt
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at org.lc.interrupt.T1.lambda$main$0(T1.java:18)
at java.lang.Thread.run(Thread.java:748)
11:38:14.518 [main] INFO org.lc.interrupt.T1 - 打断状态:false
2、打断正常运行的线程
@Slf4j
public class T2 {
public static void main(String[] args) throws InterruptedException {
Thread t1=new Thread(()->{
while (true) {
}
},"t1");
t1.start();
// 保证t1线程能够在main线程执行打断方法时运行
Thread.sleep(1000);
log.info("interrupt");
t1.interrupt();
log.info("打断状态:{}",t1.isInterrupted());
}
}
11:48:32.775 [main] INFO org.lc.interrupt.T2 - interrupt
11:48:32.778 [main] INFO org.lc.interrupt.T2 - 打断状态:true
上述程序未终止
对于正常运行的线程,如果此时t1线程被告知需要打断,则需要在线程内部手动终止程序。否则会一直运行。
@Slf4j
public class T2 {
public static void main(String[] args) throws InterruptedException {
Thread t1=new Thread(()->{
while (true) {
// 如果当前线程被告知要打断,即设置了interrupte标记为true
if(Thread.currentThread().isInterrupted()){
log.info("退出循环!");
break;
}
}
},"t1");
t1.start();
// 保证t1线程能够在main线程执行打断方法时运行
Thread.sleep(1000);
log.info("interrupt");
t1.interrupt();
log.info("打断状态:{}",t1.isInterrupted());
}
}
11:51:59.831 [main] INFO org.lc.interrupt.T2 - interrupt
11:51:59.834 [t1] INFO org.lc.interrupt.T2 - 退出循环!
11:51:59.834 [main] INFO org.lc.interrupt.T2 - 打断状态:true
3、优雅的利用interrupt打断线程
①错误思路
使用线程对象的 stop() 方法停止线程
- stop 方法会真正杀死线程,如果这时线程锁住了共享资源,那么当它被杀死后就再也没有机会释放锁, 其它线程将永远无法获取锁
使用 System.exit(int) 方法停止线程
- 目的仅是停止一个线程,但这种做法会让整个程序都停止
②终止模式之两阶段终止模式(Two Phase Termination )
模拟应用监控,若出现应用出现异常则停止监控。手动点击停止,也停止监控
③利用 isInterrupted()
interrupt 可以打断正在执行的线程,无论这个线程是在 sleep,wait,还是正常运行
@Slf4j
public class TwoPhaseTermination {
// 监视器线程
private static Thread monitor;
/**
* 启动线程监控
*/
public static void start() {
monitor =new Thread(()->{
while (true) {
// 如果当前线程是被提示需要打断
if(Thread.currentThread().isInterrupted()){
log.info("日志监控停止...");
break;
}
try {
Thread.sleep(2000);
log.info("日志监控中...");
} catch (InterruptedException e) {
// 当主程序第一次打算该线程时,如果在sleep时被打断,则interrupt为false
// 当主程序第一次打算该线程时,如果在sleep之外时被打断,则interrupt为true,这时进入while时,interrupt为true
// 所以这里我们需要手动设置,该线程要被打断
monitor.interrupt();
e.printStackTrace();
}
}
},"monitor");
monitor.start();
}
/**
* 终止线程
*/
public static void stop() {
monitor.interrupt();
}
public static void main(String[] args) throws InterruptedException {
// 启动监控
start();
// 保证已经进入监控线程
Thread.sleep(8000);
// 停止监控
log.info("stop");
stop();
}
}
13:23:05.423 [monitor] INFO org.lc.interrupt.TwoPhaseTermination - 日志监控中...
13:23:07.427 [monitor] INFO org.lc.interrupt.TwoPhaseTermination - 日志监控中...
13:23:09.427 [monitor] INFO org.lc.interrupt.TwoPhaseTermination - 日志监控中...
13:23:11.423 [main] INFO org.lc.interrupt.TwoPhaseTermination - stop
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at org.lc.interrupt.TwoPhaseTermination.lambda$start$0(TwoPhaseTermination.java:31)
at java.lang.Thread.run(Thread.java:748)
13:23:11.423 [monitor] INFO org.lc.interrupt.TwoPhaseTermination - 日志监控停止...
4、打断park线程
①打断 park 线程, 不会清空打断状态
②如果打断标记已经是 true, 则 park 会失效
@Slf4j
public class ParkTest {
public static void main(String[] args) throws InterruptedException {
Thread t1=new Thread(()->{
log.info("park...");
// 执行到该代码时,该线程会阻塞
LockSupport.park();
log.info("unpark...");
log.info("打断状态:{}",Thread.currentThread().isInterrupted());
// 再次使用park,不会阻塞该线程。 当线程打断的标记为true时,park不会阻塞线程
LockSupport.park();
log.info("unpark");
},"t1");
t1.start();
Thread.sleep(2000);
// 打断park线程
t1.interrupt();
}
}
15:31:21.611 [t1] INFO org.lc.interrupt.ParkTest - park...
15:31:23.610 [t1] INFO org.lc.interrupt.ParkTest - unpark...
15:31:23.610 [t1] INFO org.lc.interrupt.ParkTest - 打断状态:true
15:31:23.611 [t1] INFO org.lc.interrupt.ParkTest - unpark
我们可以发现,打断后的park,线程标记为true,再次使用park时,park不会阻塞。即只有当线程标记为falses时park才有效。
结论:可以使用 Thread.interrupted() 清除打断状态
注意:
Thread.currentThread().isInterrupted()
和Thread.interrupted()
最好不要同时使用。Thread.interrupted()
要在Thread.currentThread().isInterrupted()
之前使用
5、 不推荐的方法
还有一些不推荐使用的方法,这些方法已过时,容易破坏同步代码块,造成线程死锁
方法名 | 是否static | 功能说明 |
---|---|---|
stop() | 停止线程运行 | |
suspend() | 挂起(暂停)线程运行 | |
resume() | 恢复线程运行 |
八、主线程和守护线程
默认情况下线程为非守护线程,Java 进程需要等待所有线程都运行结束,才会结束。有一种特殊的线程叫做守护线程,只要其它非守 护线程运行结束了,即使守护线程的代码没有执行完,也会强制结束。
设置守护线程的方法:setDaemon(true);
该方法为实例方法
@Slf4j
public class DaemonTest {
public static void main(String[] args) throws InterruptedException {
Thread t1=new Thread(()->{
try {
log.info("开始运行");
Thread.sleep(3000);
log.info("线程t1终止...");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t1");
t1.setDaemon(true);
t1.start();
Thread.sleep(1000);
log.info("主线程终止...");
}
}
16:20:42.775 [t1] INFO org.lc.interrupt.DaemonTest - 开始运行
16:20:43.774 [main] INFO org.lc.interrupt.DaemonTest - 主线程终止...
非守护线程执行完后,所有守护线程无论是否执行完毕,直接退出。无需执行守护线程后面代码
注意
垃圾回收器线程就是一种守护线程
Tomcat 中的 Acceptor 和 Poller 线程都是守护线程,所以 Tomcat 接收到 shutdown 命令后,不会等 待它们处理完当前请求
九、线程的状态
1、五种状态(操作系统层面)
- 【初始状态】仅是在语言层面创建了线程对象,还未与操作系统线程关联
- 【可运行状态】(就绪状态)指该线程已经被创建(与操作系统线程关联),可以由 CPU 调度执行
- 【运行状态】指获取了 CPU 时间片运行中的状态
- 当 CPU 时间片用完,会从【运行状态】转换至【可运行状态】,会导致线程的上下文切换
- 【阻塞状态】
- 如果调用了阻塞 API,如 BIO(同步阻塞I/O) 读写文件,这时该线程实际不会用到 CPU,会导致线程上下文切换,进入 【阻塞状态】 等 BIO 操作完毕,会由操作系统唤醒阻塞的线程,转换至【可运行状态】
- 与【可运行状态】的区别是,对【阻塞状态】的线程来说只要它们一直不唤醒,调度器就一直不会考虑 调度它们
- 【阻塞状态】的线程会保护它的用户寄存器、程序计数器和堆栈指针等现场。处理器现在就可以转向执行其他就绪线程
- 【终止状态】表示线程已经执行完毕,生命周期已经结束,不会再转换为其它状态
2、六种状态(Java层面)
这是从 Java API 层面来描述的
根据 Thread.State
枚举,分为六种状态
NEW
线程刚被创建,但是还没有调用 start() 方法RUNNABLE
当调用了 start() 方法之后,注意,Java API 层面的 RUNNABLE 状态涵盖了 操作系统 层面的 【可运行状态】、【运行状态】和【阻塞状态】(由于 BIO(同步阻塞) 导致的线程阻塞,在 Java 里无法区分,仍然认为 是可运行)- 操作系统隐藏 Java 虚拟机(JVM)中的 READY 和 RUNNING 状态,它只能看到 RUNNABLE 状态,所以 Java 系统一般将【就绪状态】和【运行状态】这两个状态统称为 RUNNABLE(运行中) 状态 。
BLOCKED
,WAITING
,TIMED_WAITING
都是 Java API 层面对【阻塞状态】的细分,后面会在状态转换一节 详述TERMINATED
当线程代码运行结束
3、代码示例六种状态
@Slf4j
public class PeriodTest {
public static void main(String[] args) {
// 模拟新建状态NEW
Thread t1 = new Thread(() -> {
log.info("running...");
}, "t1");
// 模拟可运行状态Runnable
Thread t2 = new Thread(() -> {
while (true) {
// 即任务的重复执行,执行完后从运行状态又回到可运行状态
}
}, "t2");
t2.start();
// 模拟终止TERMINATED状态
Thread t3 = new Thread(() -> {
log.info("running...");
}, "t3");
t3.start();
// 模拟有时限的等待状态TIMED WAITING
Thread t4 = new Thread(() -> {
// 获得当前对象锁
synchronized (PeriodTest.class) {
try {
// 足够的睡眠时间
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t4");
t4.start();
// 模拟WAITING
Thread t5 = new Thread(() -> {
try {
// 阻塞状态 都不会得到任务调度器的调度
t2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t5");
t5.start();
// 模拟BLOCK
Thread t6 = new Thread(() -> {
// 此时的锁已经被t4线程拿到。所以这里拿不到锁
synchronized (PeriodTest.class) {
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t6");
t6.start();
try {
// 等待t3线程执行完
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("t1的线程状态为:{}", t1.getState());
log.info("t2的线程状态为:{}", t2.getState());
log.info("t3的线程状态为:{}", t3.getState());
log.info("t4的线程状态为:{}", t4.getState());
log.info("t5的线程状态为:{}", t5.getState());
log.info("t6的线程状态为:{}", t6.getState());
}
}
17:30:16.284 [t3] INFO org.lc.interrupt.PeriodTest - running...
17:30:17.284 [main] INFO org.lc.interrupt.PeriodTest - t1的线程状态为:NEW
17:30:17.285 [main] INFO org.lc.interrupt.PeriodTest - t2的线程状态为:RUNNABLE
17:30:17.285 [main] INFO org.lc.interrupt.PeriodTest - t3的线程状态为:TERMINATED
17:30:17.285 [main] INFO org.lc.interrupt.PeriodTest - t4的线程状态为:TIMED_WAITING
17:30:17.285 [main] INFO org.lc.interrupt.PeriodTest - t5的线程状态为:WAITING
17:30:17.285 [main] INFO org.lc.interrupt.PeriodTest - t6的线程状态为:BLOCKED
十、习题
1、烧水泡茶(sleep)
阅读华罗庚《统筹方法》,给出烧水泡茶的多线程解决方案
@Slf4j
public class T8 {
public static void main(String[] args) {
Thread t1=new Thread(()->{
try {
log.info("洗水壶...");
Thread.sleep(1000);
log.info("烧水...");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
},"张三");
Thread t2 = new Thread(() -> {
try {
log.info("洗茶壶...");
Thread.sleep(1000);
log.info("洗茶杯...");
Thread.sleep(1000);
log.info("拿茶叶...");
Thread.sleep(1000);
// 等待t1烧水的线程执行完。
// WAITING
t1.join();
log.info("泡茶叶...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "小三");
t1.start();
t2.start();
}
}
18:01:49.779 [小三] INFO org.lc.sleep_yield.T8 - 洗茶壶...
18:01:49.780 [张三] INFO org.lc.sleep_yield.T8 - 洗水壶...
18:01:50.782 [张三] INFO org.lc.sleep_yield.T8 - 烧水...
18:01:50.782 [小三] INFO org.lc.sleep_yield.T8 - 洗茶杯...
18:01:51.783 [小三] INFO org.lc.sleep_yield.T8 - 拿茶叶...
18:02:00.783 [小三] INFO org.lc.sleep_yield.T8 - 泡茶叶...
2、多线程下载图片
public class DownloadImage extends Thread{
private String url;
private String name;
public DownloadImage(String url, String name) {
this.url=url;
this.name=name;
}
protected void downloadImageHandler() {
try {
FileUtils.copyURLToFile(new URL(this.url),new File(this.name));
System.out.println("下载文件:"+this.name);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
this.downloadImageHandler();
}
public static void main(String[] args) {
DownloadImage d1=new DownloadImage("http://img.louchen.top/2020/05/rdb01.png", "1.png");
DownloadImage d2=new DownloadImage("http://img.louchen.top/2020/05/rdb02.png", "2.png");
DownloadImage d3=new DownloadImage("http://img.louchen.top/2020/05/aof01.png", "3.png");
DownloadImage d4=new DownloadImage("http://img.louchen.top/2020/05/aof02.png", "4.png");
d1.start();
d2.start();
d3.start();
d4.start();
}
}
3、龟兔赛跑
public class Race implements Runnable {
private static String winner;
protected boolean isOver(int step){
if(winner!=null){
return true;
}
if (step == 100) {
winner=Thread.currentThread().getName();
System.out.println("胜利者是:"+winner);
return true;
}
return false;
}
@Override
public void run() {
for (int i = 0; i <=100; i++) {
boolean over = this.isOver(i);
if(over){
break;
}
System.out.println(Thread.currentThread().getName()+":跑了"+i+"步;");
}
}
public static void main(String[] args) {
Thread t1 = new Thread(new Race(), "乌龟");
Thread t2 = new Thread(new Race(), "兔子");
t1.start();
t2.start();
}
}
Ⅲ、共享线程之管程(悲观锁)
1、共享带来的问题
①、java的体现
两个线程对初始值为 0 的静态变量一个做自增,一个做自减,各做 5000 次,结果是 0 吗?
@Slf4j
public class T1 {
static int count=0;
public static void main(String[] args) throws InterruptedException {
Thread t1=new Thread(()->{
for (int i = 0; i < 5000; i++) {
count++;
}
},"t1");
Thread t2=new Thread(()->{
for (int i = 0; i < 5000; i++) {
count--;
}
},"t2");
t1.start();
t2.start();
t1.join();
t2.join();
log.info("count:{}",count);
}
}
21:47:14.620 [main] INFO org.lc.synchronized_t.T1 - count:486
答案显然不为0
②、问题分析
以上的结果可能是正数、负数、零。为什么呢?因为 Java 中对静态变量的自增,自减并不是原子操作,要彻底理 解,必须从字节码来进行分析
例如对于 i++
而言(i 为静态变量),实际会产生如下的 JVM 字节码指令:
getstatic i // 获取静态变量i的值
iconst_1 // 准备常量1
iadd // 自增
putstatic i // 将修改后的值存入静态变量i
而对应 i--
也是类似:
getstatic i // 获取静态变量i的值
iconst_1 // 准备常量1
isub // 自减
putstatic i // 将修改后的值存入静态变量i
而 Java 的内存模型如下,完成静态变量的自增,自减需要在主存和工作内存中进行数据交换:
如果是单线程以上 8 行代码是顺序执行(不会交错)没有问题:
但多线程下这 8 行代码可能交错运行: 出现负数的情况:
出现正数的情况:
③、临界区 Critical Section
- 一个程序运行多个线程本身是没有问题的
- 问题出在多个线程访问共享资源
- 多个线程读共享资源其实也没有问题
- 在多个线程对共享资源读写操作时发生指令交错,就会出现问题
- 一段代码块内如果存在对共享资源的多线程读写操作,称这段代码块为临界区
例如,下面代码中的临界区:
static int counter = 0;
static void increment() // 临界区
{
counter++;
}
static void decrement() // 临界区
{
counter--;
}
④、竞态条件(Race Condition ):
多个线程在临界区内执行,由于代码的执行序列不同而导致结果无法预测,称之为发生了竞态条件
4、synchronized(也叫互斥锁)解决方案
为了避免临界区的竞态条件发生,有多种手段可以达到目的。
- 阻塞式的解决方案:synchronized,Lock
- 非阻塞式的解决方案:原子变量
**阻塞式的【对象锁】:**它采用互斥的方式让同一 时刻至多只有一个线程能持有【对象锁】,其它线程再想获取这个【对象锁】时就会阻塞住。这样就能保证拥有锁 的线程可以安全的执行临界区内的代码,不用担心线程上下文切换
注意:
虽然 java 中互斥和同步都可以采用 synchronized 关键字来完成,但它们还是有区别的:
- 互斥是保证临界区的竞态条件发生,同一时刻只能有一个线程执行临界区代码
- 同步是由于线程执行的先后、顺序不同、需要一个线程等待其它线程运行到某个点
①语法
线程1拿到对象锁之后,线程2来访问该临界区会发生blocked阻塞
注意:锁不能为基本数据类型。只能为实例类型或包装类,包括String,且不能为null。
synchronized(对象) // 线程1, 线程2(blocked)
{
临界区
}
②解决上述1中出现的问题
@Slf4j
public class T1 {
static int count=0;
// o对象作为锁对象
static Object o=new Object();
public static void main(String[] args) throws InterruptedException {
Thread t1=new Thread(()->{
for (int i = 0; i < 5000; i++) {
//临界区为count++
synchronized (o) {
count++;
}
}
},"t1");
Thread t2=new Thread(()->{
for (int i = 0; i < 5000; i++) {
////临界区为count--
synchronized (o) {
count--;
}
}
},"t2");
t1.start();
t2.start();
t1.join();
t2.join();
log.info("count:{}",count);
}
}
11:15:17.879 [main] INFO org.lc.synchronized_t.T1 - count:0
这时我们发现是始终为0
交替执行count++和count--。
假如t1线程中先拿到锁,执行完count++代码块之后,该线程释放锁,然后,t1和t2线程再次同时竞争该锁,谁拿到锁谁先执行,没拿到的等待。直到所有的代码程序执行完毕。
③时序提来解释上述过程
④通俗的方式解释
你可以做这样的类比:
- synchronized(对象) 中的对象,可以想象为一个房间(room),有唯一入口(门)房间只能一次进入一人 进行计算,线程 t1,t2 想象成两个人
- 当线程 t1 执行到 synchronized(room) 时就好比 t1 进入了这个房间,并锁住了门拿走了钥匙,在门内执行 count++ 代码
- 这时候如果 t2 也运行到了 synchronized(room) 时,它发现门被锁住了,只能在门外等待,发生了上下文切 换,阻塞住了
- 这中间即使 t1 的 cpu 时间片不幸用完,被踢出了门外(不要错误理解为锁住了对象就能一直执行下去哦), 这时门还是锁住的,t1 仍拿着钥匙,t2 线程还在阻塞状态进不来,只有下次轮到 t1 自己再次获得时间片时才 能开门进入
- **当 t1 执行完 synchronized{} 块内的代码,这时候才会从 obj 房间出来并解开门上的锁,**唤醒 t2 线程把钥 匙给他。t2 线程这时才可以进入 obj 房间,锁住了门拿上钥匙,执行它的 count-- 代码
⑤思考
如果把 synchronized(obj) 放在 for 循环的外面,如何理解?
@Slf4j public class T1 { static int count=0; // o对象作为锁对象 static Object o=new Object(); public static void main(String[] args) throws InterruptedException { Thread t1=new Thread(()->{ synchronized (o) { //相当于两条的自增指令,保持原子性执行了200次,该期间不会被其他线程所干扰 for (int i = 0; i < 100; i++) { System.out.println("t1"); count++; } } },"t1"); Thread t2=new Thread(()->{ synchronized (o) { for (int i = 0; i < 100; i++) { System.out.println("t2"); count++; } } },"t2"); t1.start(); t2.start(); t1.join(); t2.join(); log.info("count:{}",count); } }
每次需要把synchronized(锁){}中的代码块执行完后才会释放锁,即谁先拿到锁,谁先执行完所有for循环的count++或count--操作。
如果 t1 synchronized(obj1) 而 t2 synchronized(obj2) 会怎样运作?
- 锁住的对象不同,没有产生竞争关系。 无效
如果 t1 synchronized(obj) 而 t2 没有加会怎么样?如何理解?
- 对象没有产生竞争。无效
⑥面向对象改进
把需要保护的共享变量放入一个类
@Slf4j
public class T1 {
public static void main(String[] args) throws InterruptedException {
Room room=new Room();
Thread t1=new Thread(()->{
for (int i = 0; i < 1000; i++) {
System.out.println("t1");
room.increment();
}
},"t1");
Thread t2=new Thread(()->{
for (int i = 0; i < 1000; i++) {
System.out.println("t2");
room.decrement();
}
},"t2");
t1.start();
t2.start();
// 让t1线程跑完
t1.join();
// 让t2线程跑完
t2.join();
log.info("count:{}",room.getCount());
}
}
class Room{
private int count=0;
/**
* 加
*/
public void increment(){
// 锁住当前创建的对象
synchronized (this){
count++;
}
}
/**
* 减
*/
public void decrement(){
synchronized (this) {
count--;
}
}
/**
* 获得值
* @return
*/
public int getCount() {
synchronized (this) {
return count;
}
}
}
//交替执行t1 t2
12:56:13.825 [main] INFO org.lc.synchronized_t.T1 - count:0
5、方法上的synchronized
- 非静态方法锁的是对象, 静态方法因为共享性质,锁的是类对象(类对象只能有一份,即无论new多少个对象,其中的类对象只有一份)
- 类中的非静态成员储存在堆。静态方法储存在方法区,被该类中的对象所共享
①加在普通方法
**注意:**这里不叫锁方法,而是叫锁当前this对象
class Test{
public synchronized void test(){
}
}
//等价于==>
class Test{
public void test(){
synchronized(this){
}
}
}
②加在静态方法上
**注意:**这里叫锁类对象,因为在静态方法中无法调用this对象。
class Test{
public synchronized static void test(){
}
}
//等价于==>
class Test{
public static void test(){
synchronized(Test.class){
}
}
}
③线程八锁
其实就是考察 synchronized 锁住的是哪个对象(实例对象,还是类对象)
**情况1: ** 结果为1 2 或 2 1
@Slf4j
public class T2 {
public static void main(String[] args) {
Number number=new Number();
// 使用的同一对象,即锁同一对象,产生互斥
new Thread(()->{number.a();},"t1").start();
new Thread(()->{number.b();},"t2").start();
}
}
@Slf4j
class Number{
public synchronized void a() {
log.debug("1");
}
public synchronized void b() {
log.debug("2");
}
}
情况2: 结果为 2 (1秒后) 1 或 (1秒后) 1 2
@Slf4j
public class T2 {
public static void main(String[] args) {
Number number=new Number();
// 使用的同一对象,即锁同一对象,产生互斥
Thread t1=new Thread(()->{number.a();},"t1");
Thread t2=new Thread(()->{number.b();},"t2");
t1.start();
t2.start();
}
}
@Slf4j
class Number{
public synchronized void a() {
try {
//如果线程首先运行此代码,则后序线程使用该对象(是对象,不是方法)时,会阻塞。
Thread.sleep(1000);
log.debug("1");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public synchronized void b() {
log.debug("2");
}
}
情况3:
结果为: 3 (1秒后) 1 2 或 3 2 (1秒后) 1 或 2 3 (1秒后) 1
@Slf4j
public class T2 {
public static void main(String[] args) {
Number number=new Number();
// t1,t2使用的同一对象,即锁同一对象,产生互斥
Thread t1=new Thread(()->{number.a();},"t1");
Thread t2=new Thread(()->{number.b();},"t2");
// t3与(t1,t2)并发,t1与t2产生竞争关系
Thread t3=new Thread(()->{number.c();},"t3");
t1.start();
t2.start();
t3.start();
}
}
@Slf4j
class Number{
public synchronized void a() {
try {
Thread.sleep(1000);
log.debug("1");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public synchronized void b() {
log.debug("2");
}
public void c() {
log.debug("3");
}
}
**情况4:**结果为 2 (1秒后) 1
@Slf4j
public class T2 {
public static void main(String[] args) {
Number number1=new Number();
Number number2=new Number();
// t1,t2使用的不同对象,即不互斥 并行关系
Thread t1=new Thread(()->{number1.a();},"t1");
Thread t2=new Thread(()->{number2.b();},"t2");
t1.start();
t2.start();
}
}
@Slf4j
class Number{
public synchronized void a() {
try {
Thread.sleep(2000);
log.debug("1");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public synchronized void b() {
log.debug("2");
}
}
情况5: 结果为 2 (1秒后) 1
@Slf4j
public class T2 {
public static void main(String[] args) {
Number number1=new Number();
// t1执行的为类对象 t2为this实例对象。即为并行关系
Thread t1=new Thread(()->{number1.a();},"t1");
Thread t2=new Thread(()->{number1.b();},"t2");
t1.start();
t2.start();
}
}
@Slf4j
class Number{
// 锁的为类对象
public static synchronized void a() {
try {
Thread.sleep(1000);
log.debug("1");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 锁的为this对象实例
public synchronized void b() {
log.debug("2");
}
}
情况6: 结果为 2 (1秒后) 1 或 (1秒后) 1 2
@Slf4j
public class T2 {
public static void main(String[] args) {
Number number1=new Number();
// t1与t2执行的都是同一类对象。即为互斥关系
Thread t1=new Thread(()->{number1.a();},"t1");
Thread t2=new Thread(()->{number1.b();},"t2");
t1.start();
t2.start();
}
}
@Slf4j
class Number{
// 锁的为类对象
public static synchronized void a() {
try {
Thread.sleep(2000);
log.debug("1");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 锁的为类对象
public static synchronized void b() {
log.debug("2");
}
}
情况7: 2 (1秒后) 1
@Slf4j
public class T2 {
public static void main(String[] args) {
Number number1=new Number();
Number number2=new Number();
// t1锁的为类对象Number,无论new多少个对象,类对象只有一个。
// t2锁的为this对象实例
// t1与t2并行关系
Thread t1=new Thread(()->{number1.a();},"t1");
Thread t2=new Thread(()->{number2.b();},"t2");
t1.start();
t2.start();
}
}
@Slf4j
class Number{
// 锁的为类对象
public static synchronized void a() {
try {
Thread.sleep(2000);
log.debug("1");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 锁的为this实例对象
public synchronized void b() {
log.debug("2");
}
}
情况8: 结果为 2 (1秒后) 1 或 (1秒后) 1 2
@Slf4j
public class T2 {
public static void main(String[] args) {
Number number1=new Number();
Number number2=new Number();
// t1锁的为类对象Number,无论new多少个对象,类对象只有一个Number.class。
// t2锁的为类对象Number,无论new多少个对象,类对象只有一个Number.class。
// t1与t2互斥关系
Thread t1=new Thread(()->{number1.a();},"t1");
Thread t2=new Thread(()->{number2.b();},"t2");
t1.start();
t2.start();
}
}
@Slf4j
class Number{
// 锁的为类对象
public static synchronized void a() {
try {
Thread.sleep(2000);
log.debug("1");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 锁的为类对象
public static synchronized void b() {
log.debug("2");
}
}
结论:
锁类对象:首先判断是否为同一类对象,若为同一类对象,无论new多少次,类对象只有一份。即产生互斥
static synchronized void test(){}
synchronized(Test.class){}
锁this类对象: 若使用的同一实例对象(需要在临界区加synchronized),则产生互斥。
synchronized void test(){}
synchronized(this){}
6、变量的线程安全分析
①成员变量和静态变量是否线程安全
- 如果它们没有共享,则线程安全
- 如果它们被共享了,根据它们的状态是否能够改变,又分两种情况
- 如果只有读操作,则线程安全
- 如果有读写操作,则这段代码是临界区,需要考虑线程安全
②局部变量是否线程安全?
- 局部变量是线程安全的
- 但局部变量引用的对象则未必
- 如果该对象没有逃离方法的作用访问,它是线程安全的
- 如果该对象逃离方法的作用范围(即该对象为方法外的变量),需要考虑线程安全
局部变量线程安全分析
public static void test1() {
int i = 10;
i++;
}
每个线程调用 test1() 方法时局部变量 i,会在每个线程的栈帧内存中被创建多份,因此不存在共享。即每个线程在栈中的独立栈帧都会创建一个新的i变量内存
③引用的变量在方法外(产生线程安全问题)
public class T3 {
// 线程个数
static final int THRUED_NUMBER=2;
// 循环次数
static final int LOOP_NUMBER=200;
public static void main(String[] args) {
ThreadUnSafe test=new ThreadUnSafe();
for (int i = 0; i < THRUED_NUMBER; i++) {
// 这里两个线程共用同一个对象中的list成员变量。所以会发生remove异常(未添加元素就移除)
new Thread(()->{
test.method1(LOOP_NUMBER);
},"Thread:"+i).start();
}
}
}
class ThreadUnSafe{
// 成员变量
ArrayList<String> list=new ArrayList<>();
public void method1(int loopNumber) {
for (int i = 0; i < loopNumber; i++) {
//临界区 产生竞争条件
this.method2();
this.method3();
}
}
private void method2() {
list.add("1");
}
private void method3() {
list.remove(0);
}
}
Exception in thread "Thread:0" java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:657)
at java.util.ArrayList.remove(ArrayList.java:496)
at org.lc.synchronized_t.ThreadUnSafe.method3(T3.java:45)
at org.lc.synchronized_t.ThreadUnSafe.method1(T3.java:36)
at org.lc.synchronized_t.T3.lambda$main$0(T3.java:21)
at java.lang.Thread.run(Thread.java:748)
**分析:**即某一个线程先执行add,我们发现add源码中有一个size++,该size是成员变量,多个线程共享size操作,即会出现线程不安全的问题。即不同的线程操作add时,可能size可能出现竞争导致size写回的时候不会添加或者添加被覆盖的情况。即会出现索引越界的异常
④引用的变量在方法内(线程安全)
将 list 修改为局部变量
public class T3 {
// 线程个数
static final int THRUED_NUMBER=2;
// 循环次数
static final int LOOP_NUMBER=200;
public static void main(String[] args) {
ThreadUnSafe test=new ThreadUnSafe();
for (int i = 0; i < THRUED_NUMBER; i++) {
new Thread(()->{
test.method1(LOOP_NUMBER);
},"Thread:"+i).start();
}
}
}
class ThreadUnSafe{
public void method1(int loopNumber) {
// 局部变量
ArrayList<String> list=new ArrayList<>();
for (int i = 0; i < loopNumber; i++) {
this.method2(list);
this.method3(list);
}
}
private void method2(ArrayList<String> list) {
list.add("1");
}
private void method3(ArrayList<String> list) {
list.remove(0);
}
}
分析:
- list 是局部变量,每个线程调用时会创建其不同实例,没有共享
- 不同线程为list变量创建不同的栈帧内存。即不共享
- 而 method2 的参数是从 method1 中传递过来的,与 method1 中引用同一个对象
方法访问修饰符带来的思考,如果把 method2 和 method3 的方法修改为 public 会不会代理线程安全问题?
- 情况1:有其它线程调用 method2 和 method3
- 情况2:在 情况1 的基础上,为 ThreadSafe 类添加子类,子类覆盖 method2 或 method3 方法,即
class ThreadUnSafe{
public void method1(int loopNumber) {
ArrayList<String> list=new ArrayList<>();
for (int i = 0; i < loopNumber; i++) {
method2(list);
method3(list);
}
}
public void method2(ArrayList<String> list) {
list.add("1");
}
public void method3(ArrayList<String> list) {
list.remove(0);
}
}
class ThreadSafeSubClass extends ThreadUnSafe{
@Override
public void method3(ArrayList<String> list) {
new Thread(()->{
list.remove(0);
}).start();
}
}
结论:从这个例子可以看出 private 或 final 提供【安全】的意义所在,请体会开闭原则中的【闭】
7、常见线程安全类
String
Integer
StringBuffer
Random
Vector
Hashtable
java.util.concurrent 包下的类
这里说它们是线程安全的是指,多个线程调用它们同一个实例的某个方法时,是线程安全的。也可以理解为:
HashTable给每个方法加上synchronized给对象上锁。即多个线程操作同一个对象会产生互斥,只要是操作了对象中的任意synchronized方法
Hashtable hashtable=new Hashtable();
new Thread(()->{
hashtable.put("k1", "v1");
},"t1").start();
new Thread(()->{
hashtable.put("k2", "v2");
},"t2").start();
- 它们的每个方法是原子的
①线程安全类方法的组合(非线程安全)
这种组合方式,并不能保证原子性。
Hashtable hashtable=new Hashtable();
//多个线程操作 t1,t2
if(hashtable.get("k1")==null){
//可能在这两个操作中间存在其他线程的修改
hashtable.put("k1", "v1");
}
**注意:**假设线程1先拿到锁,先执行get操作,执行完后会释放锁。此时t2线程开始执行get操作,执行完后释放锁,恰巧t2线程又拿到了锁执行put操作,执行完后释放锁。然后t1线程拿到锁执行put操作,此时t2线程的put操作已被t1线程覆盖
②不可变类线程安全性
String、Integer 等都是不可变类,因为其内部的状态不可以改变,因此它们的方法都是线程安全的
String 有 replace,substring 等方法【可以】改变值啊,那么这些方法又是如何保证线程安 全的呢?
因为我们从源码可以发现,replace和substring 都是对其原来的字符串进行修改拷贝后重新new了一个新的字符串
public String substring(int beginIndex) {
if (beginIndex < 0) {
throw new StringIndexOutOfBoundsException(beginIndex);
}
int subLen = value.length - beginIndex;
if (subLen < 0) {
throw new StringIndexOutOfBoundsException(subLen);
}
return (beginIndex == 0) ? this : new String(value, beginIndex, subLen);
}
8、习题
①买票
@Slf4j
public class T4 {
// random线程安全
static Random random=new Random();
/**
* 随机获取买票数量 1-5
* @return
*/
public static int randomAmount() {
return random.nextInt(5)+1;
}
public static void main(String[] args) {
TicketWindow ticketWindow = new TicketWindow(2000);
// 储存线程
List<Thread> threadList=new ArrayList<>();
// 存储每个线程卖的票数
// 这里需要使用Vector来保证线程安全,因为多个线程多同一个集合进行操作
List<Integer> sellCount=new Vector<>();
for (int i = 0; i < 1000; i++) {
Thread thread=new Thread(()->{
try {
// 模拟线程创建延迟
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 多个线程对同一ticketWindow对象操作,产生线程安全问题
int sell = ticketWindow.sell(randomAmount());
// 多个线程对同一集合操作,产生线程安全问题
sellCount.add(sell);
});
// 这里的集合线程安全,因为只是对thread变量的引用进行添加操作,线程没有对threadList集合操作
threadList.add(thread);
// 启动线程
thread.start();
}
// 遍历所有线程
threadList.forEach(t-> {
try {
// 保证所有线程执行完毕
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 遍历集合进行买票求和操作
int sum = sellCount.stream().mapToInt(t -> t).sum();
log.info("卖的总票数为:{}",sum);
log.info("剩余总票数为:{}",ticketWindow.getCount());
}
}
//模拟售票窗口
class TicketWindow{
//余票
private int count;
public TicketWindow(int count) {
this.count = count;
}
//获取余票
public int getCount() {
return count;
}
/**
*售票方法
* @param amount 售票数量
* @return 成功售票,返回售票数量。售票失败。返回0
*/
public int sell(int amount){
if (this.count >= amount) {
this.count -= amount;
return amount;
}
return 0;
}
}
结果 售票数量和初始余票数量(2000)不一致。线程不安全。
12:57:03.578 [main] INFO org.lc.synchronized_t.T4 - 卖的总票数为:2048
12:57:03.582 [main] INFO org.lc.synchronized_t.T4 - 剩余总票数为:0
解决方法: 给售票方法(临界区)加synchornized
/**
*售票方法
* @param amount 售票数量
* @return 成功售票,返回售票数量。售票失败。返回0
*/
public synchronized int sell(int amount){
if (this.count >= amount) {
this.count -= amount;
return amount;
}
return 0;
}
②转账
@Slf4j
public class T5 {
static Random random = new Random();
//随机转账1到100
public static int randomInt() {
return random.nextInt(100) + 1;
}
public static void main(String[] args) throws InterruptedException {
// 账户a 1000
Account a = new Account(1000);
// 账户b 1000
Account b = new Account(1000);
// 线程t1实现a对b的多次随机转账
Thread t1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
// 线程不安全
a.transfer(b, randomInt());
}
});
// 线程2实现b对a的多次随机转账
Thread t2=new Thread(()->{
for (int i = 0; i < 1000; i++) {
// 线程不安全
b.transfer(a, randomInt());
}
});
t1.start();
t2.start();
t1.join();
t2.join();
log.info("账户a:{}",a.getMoney());
log.info("账户b:{}",b.getMoney());
}
}
//账户类
class Account {
// 账户余额
private int money;
public Account(int money) {
this.money = money;
}
public int getMoney() {
return money;
}
public void setMoney(int money) {
this.money = money;
}
/**
* 转账操作
*
* @param target 目标转账用户
* @param amount 转账金额
*/
public void transfer(Account target, int amount) {
if (this.money >= amount) {
this.setMoney(this.getMoney() - amount);
target.setMoney(target.getMoney() + amount);
}
}
}
结果为:账户完全紊乱。
15:42:02.613 [main] INFO org.lc.synchronized_t.T5 - 账户a:1607
15:42:02.617 [main] INFO org.lc.synchronized_t.T5 - 账户b:0
首先我们可以想到给方法加锁:
public synchronized void transfer(Account target, int amount) {
if (this.money >= amount) {
this.setMoney(this.getMoney() - amount);
target.setMoney(target.getMoney() + amount);
}
}
注意:上述方法是给当前对象加锁,即当前调用transfer方法的对象加锁,但是传过来的Account对象还是两个线程在共享使用,即会出现问题。
最终解决:我们给当前类对象加锁,尽管你创建多个账户对象,但是对个线程进行转账的时候,只有一个线程能执行该方法。即类对象只有一个,给类对象加锁,被多个线程使用时互斥。但是这会出现一个缺点,同一个时间只有一个线程能做此事,导致效率非常低。
public void transfer(Account target, int amount) {
synchronized (Account.class) {
if (this.money >= amount) {
this.setMoney(this.getMoney() - amount);
target.setMoney(target.getMoney() + amount);
}
}
}
15:46:10.728 [main] INFO org.lc.synchronized_t.T5 - 账户a:1999
15:46:10.733 [main] INFO org.lc.synchronized_t.T5 - 账户b:1
9、Synchronized原理
①Monitor概念
以 32 位虚拟机为例
1)普通对象:
Klass Word 存储的是对象的类型,即类对象的指针
Mark Word 包含的为对象的详细信息,即对象的成员
Klass Word和Mark Word合起来占用64个bit位组成对象头信息
例如:
- Integer对象大小=存储Integer类对象的指针4字节+储存值的4字节
- int 占4字节。所以我们发现Integer存储的值比int存储的值占用空间多3倍
2)数组对象
- 对象头占96bit,其中数组长度占32bit
3)Mark Word结构:
注意: 无锁状态和偏向锁的状态区分为biased_lock的值,1启用偏向锁,0未启用偏向锁
这里我们主要分析32位的虚拟机
无锁状态(Normal): hashcode
占25bit | 分代年龄占4bit | 是否启用偏向锁占1bit | 锁的标记位01 占2bit
无锁状态(Normal): 线程ID| epoch(批量重定向和批量撤销) | 分代年龄 | 是否启用偏向锁| 锁的标记位01 占2bit
轻量级锁(Lightweight Locked): 指向锁记录的指针 占30bit | 锁的标记位00 占2bit
重量级锁(HeavyWeight Locked): 指向重量级锁的monitor指针 占30bit | 锁的标记位10 占2bit
②Monitor原理(重量级锁加解锁流程)
Monitor 被翻译为监视器或管程 每个 Java 对象都可以关联一个 Monitor 对象,如果使用 **synchronized 给对象上锁(重量级)**之后,该对象头的 Mark Word 中就被设置指向 Monitor 对象的指针
- 第一个线程进行获取锁的操作:当我们的一个线程Thread2进入synchronized代码块获得锁的时候。首先我们将加锁的对象obj和操作系统提供的一个monitor对象相关联,将对象obj中的markword替换为mintor对象的指针地址(占30bit),然后将markword中的锁的标志位改为10。然后将monitor中的Owner锁的持有者和线程Thread2相关联
- 第二三个线程进行获取锁的操作:当第二个线程Thread-1来对获取锁时,首先检查该monitor中的owner是否有其他线程占用,如果被占用则线程Thread-1进入阻塞队列,该阻塞队列和monitor中的EntryList(链表)相关联。若后续还有一个线程Thread-3来获取锁的时候,同样去monitor对象中查看owner是否被其他线程所占用,被占有则进入阻塞队列。
- 当我们的线程Thread-2中临界区中的代码执行完毕的时候,释放锁。此时Thread-1和Thread-3采用非公平锁的方式竞争此锁,若线程Thread-2获得锁,即将monitor中的owner与线程Thread-2相关联,其他线程阻塞。
总结:
注意
- synchronized 必须是进入同一个对象的 monitor 才有上述的效果 ,即进入synchronize获得不同的锁对象关联的monitor对象不同
- 不加 synchronized 的对象不会关联监视器,不遵从以上规则
③字节码形式理解synchronized(加解锁流程)
public class T1 {
static final Object lock=new Object();
static int count=0;
public static void main(String[] args) {
synchronized (lock) {
count++;
}
}
}
字节码
public static void main(java.lang.String[]);
descriptor: ([Ljava/lang/String;)V
flags: ACC_PUBLIC, ACC_STATIC
Code:
stack=2, locals=3, args_size=1
//从常量池中取出Object对象
0: getstatic #2 // Field lock:Ljava/lang/Object; <- lock引用 (synchronized开始)
//赋值一份锁的地址引用
3: dup
//将赋值的锁对象地址引用存入一个临时slot1匿名槽中
4: astore_1 // lock引用 -> slot 1
//将锁对象的中的markword替换为monitor地址指针
5: monitorenter // 将 lock对象 MarkWord 置为 Monitor 指针
//获取字段变量i
6: getstatic #3 // Field count:I
//准备常量1
9: iconst_1
//开始的i++的操作
10: iadd
//将得出的值赋值给字段i
11: putstatic #3 // Field count:I
//加载赋值的一份锁的引用
14: aload_1
//进行解锁操作
15: monitorexit // 将 lock对象 MarkWord 重置,将monitor中存储锁对象的头中的hashcode、分代年龄等还原到锁对象的对象头中。 然后唤醒阻塞队列EntryList中的线程
16: goto 24
19: astore_2
20: aload_1
21: monitorexit
22: aload_2
23: athrow
24: return
Exception table:
from to target type
//如果6-16即synchronized中的代码发生异常,则解锁后抛出异常
6 16 19 any
//如果19-22即捕获时发生异常,再次捕获,然后解锁抛出异常
19 22 19 any
LineNumberTable:
line 14: 0
line 15: 6
line 16: 14
line 17: 24
LocalVariableTable:
Start Length Slot Name Signature
0 25 0 args [Ljava/lang/String;
④轻量级锁
轻量级锁的使用场景:如果一个对象虽然有多线程要加锁,但加锁的时间是错开的(也就是没有竞争),那么可以 使用轻量级锁来优化。 轻量级锁对使用者是透明的,即语法仍然是 synchronized 假设有两个方法同步块,利用同一个对象加锁。
满足轻量级锁的条件: 例如当线程t1对方法method1加锁执行,当线程t1对mehtod1进行加解锁执行完毕后,线程2再对方法mehtod2进行加锁的执行,此时该对象没有竞争。
static final Object obj = new Object();
public static void method1() {
synchronized( obj ) {
// 同步块 A
method2();
}
}
public static void method2() {
synchronized( obj ) {
// 同步块 B
}
}
加解锁流程
首先一个线程进入到mehtod1方法的synchronized块时,会在线程的栈帧中创建锁记录(Lock Record)对象,每个线程都的栈帧都会包含一个锁记录的结构,内部可以存储锁定对象的 Mark Word
会让锁记录的Object reference指向锁对象。并尝试用cas替换Object的Mark Word, 将Mark Word的值(hashcode、分代年龄、是否为偏向锁、锁标志位01(代表无锁状态))存入锁记录,将锁记录(该锁记录的地址、锁标志位00表示轻量级锁)存入锁对象的Mark Word中
如果 cas 替换成功,则对象头中存储了 指向栈中的锁记录地址和锁标志位00(代表轻量级锁) ,表示由该线程给对象加锁,这时图示如下
如果cas替换失败,有两种情况
- 升级为重量级锁:如果是其它线程已经持有了该 Object 的轻量级锁,这时表明有竞争,那么此时当前线程只有通过自旋的方式去获取锁。如果在自旋一定次数后仍为获得锁,那么轻量级锁将会升级成重量级锁进入锁膨胀过程 (这里我们不讨论)
- 锁重入:如果是自己执行了 synchronized 锁重入(也就是说同一线程执行method1中的method2方法中也有synchronized时,对同一个对象加了两次锁),那么再添加一条 Lock Record 作为重入的计数。此时也将第二个锁记录的Object reference指向该锁对象,但是此时进行和对锁象头cas交换会失败,则锁记录地址储存为null,但是不影响。但是我们可以根据锁记录的个数来指定重入了几次锁。
当退出 synchronized 代码块(解锁时)如果有取值为 null 的锁记录,表示有重入,这时移除该锁记录,表示重入计数减一
当退出 synchronized 代码块(解锁时)锁记录的值不为 null,这时使用 cas 将 Mark Word 的值恢复给对象头
- 成功,则解锁成功
- 失败,说明轻量级锁进行了锁膨胀或已经升级为重量级锁,进入重量级锁解锁流程(这里我们不讨论)
⑤锁膨胀(轻量级锁膨胀为重量级锁)
如果在尝试加轻量级锁的过程中,CAS 操作无法成功,这时一种情况就是有其它线程为此对象加上了轻量级锁(有竞争),这时需要进行锁膨胀,将轻量级锁变为重量级锁。即一个线程已经对对象加了轻量级锁,此时另一个线程也来对该对象加锁
static Object obj = new Object();
public static void method1() {
synchronized( obj ) {
// 同步块
}
}
当 Thread-1 进行轻量级加锁时,Thread-0 已经对该对象加了轻量级锁
即线程Thread-1发现对象头的锁标志位已经为00(表示已经加了轻量级锁),那么cas失败
这时 Thread-1 加轻量级锁失败,进入锁膨胀流程
- 即为 Object 对象申请 Monitor 锁,让 Object 对象头指向monitor地址(并将锁标志位改为10,代表重量级锁)
- 然后线程Thread-1自己进入 Monitor 的 EntryList BLOCKED阻塞
当 Thread-0 退出同步块解锁时,使用 cas 将 Mark Word 的值恢复给对象头,CAS失败(因为此时锁对象头的信息已经被修改)。这时会进入重量级解锁 流程,即按照 Monitor 地址找到 Monitor 对象,设置 Owner 为 null,唤醒 EntryList 中 BLOCKED 线程
⑥自旋优化
多个线程竞争的时候,还可以使用自旋来进行优化,如果当前线程自旋成功(即这时候持锁线程已经退出了同步 块,释放了锁),这时当前线程就可以避免阻塞。
自旋重试成功的情况
当线程1对对象加锁执行的过程中,如果线程2来对此同步代码块加锁的时候会失败,此时线程2不会立刻进入阻塞状态,而是通过几次自选重试来获取锁,如果成功则加锁执行同步代码块。这样的好处时避免阻塞唤醒时造成的cpu上下文切换导致效率变低。
自旋重试失败的情况
如果线程2自旋重试一定次数还未获得锁,才进入阻塞状态
注意
- 自旋会占用 CPU 时间,单核 CPU 自旋就是浪费,多核 CPU 自旋才能发挥优势。
- 在 Java 6 之后自旋锁是自适应的,比如对象刚刚的一次自旋操作成功过,那么认为这次自旋成功的可能性会 高,就多自旋几次;反之,就少自旋甚至不自旋,总之,比较智能。
- Java 7 之后不能控制是否开启自旋功能
- 自旋的优缺点和使用场景:减少线程上下文切换,效率能够提升,但是会消耗cpu资源,适用于多核cpu。自旋适合于多核cpu,被锁对象的时间比较短的场景
⑦偏向锁
轻量级锁在没有竞争时(就自己这个线程),每次重入仍然需要执行 CAS 操作。 也就数说本线程多次对锁的获取时都会执行CAS操作,都会将锁的对象头和锁记录进行比较和交换的操作,导致效率很低。
Java 6 中引入了偏向锁来做进一步优化:只有第一次使用 CAS 将线程 ID 设置到对象的 Mark Word 头,之后发现 这个线程 ID 是自己的就表示没有竞争,不用重新 CAS。以后只要不发生竞争,这个对象就归该线程所有。例如:
static final Object obj = new Object();
public static void m1() {
synchronized( obj ) {
// 同步块 A
m2();
}
}
public static void m2() {
synchronized( obj ) {
// 同步块 B
m3();
}
}
public static void m3() {
synchronized( obj ) {
// 同步块 C
}
}
1)偏向状态
以64位虚拟机为例
一个对象创建时:
- 如果开启了偏向锁(默认开启),那么对象创建后,markword 值为 0x05 即后 3 位为 101,这时它的 thread、epoch、age 都为 0
- 偏向锁是默认是延迟的,不会在程序启动时立即生效,如果想避免延迟,可以加 VM 参数
XX:BiasedLockingStartupDelay=0
来禁用延迟 - 如果没有开启偏向锁,那么对象创建后,markword 值为 0x01 即后 3 位为 001,这时它的 hashcode、 age 都为 0,第一次用到 hashcode 时才会赋值,即调用hashcode方法会撤销偏向锁。
测试偏向锁(默认开启):
利用 jol 第三方工具来查看对象头信息
输出:
- 我们发现创建的时候默认是偏向锁的状态,加锁后使用的是偏向锁,解锁后也是保留的偏向锁的状态.
- 处于偏向锁的对象解锁后,线程 id 仍存储于对象头中
测试禁用偏向锁:
在上面测试代码运行时在添加 VM 参数 -XX:-UseBiasedLocking
禁用偏向锁
输出:
- 我们创建的对象是无锁的状态,加锁后为轻量级锁,解锁后变为无锁的状态
2)撤销--调用对象hashcode
调用了对象的 hashCode,但偏向锁的对象 MarkWord 中存储的是线程 id,如果调用 hashCode 会导致偏向锁被 撤销
- 轻量级锁会在锁记录中记录 hashCode
- 重量级锁会在 Monitor 中记录 hashCode
在调用 hashCode 后使用偏向锁,记得去掉 -XX:-UseBiasedLocking
启用偏向锁
输出
我们发现调用hashcode偏向锁撤销为无锁状态,加锁后变为轻量级锁,解锁后变为无锁状态
偏向锁撤销的原因
- 因为当对象为偏向锁的时候存入线程id后,调用hashcode方法存入hashcode时无法存下,所以会撤销为无锁状态去存这个hashcode
3)撤销--其他线程使用该锁对象(无竞争)
当有其它线程使用偏向锁对象时,会将偏向锁升级为轻量级锁
注意: 这里的其他的线程使用该锁对象时,必须没有竞争,否则就成重量级锁了
private static void test2() throws InterruptedException {
Dog d = new Dog();
Thread t1 = new Thread(() -> {
synchronized (d) {
log.debug(ClassLayout.parseInstance(d).toPrintableSimple(true));
}
synchronized (TestBiased.class) {
TestBiased.class.notify();
}
// 如果不用 wait/notify 使用 join 必须打开下面的注释
// 因为:t1 线程不能结束,否则底层线程可能被 jvm 重用作为 t2 线程,底层线程 id 是一样的
/*try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}*/
}, "t1");
t1.start();
Thread t2 = new Thread(() -> {
//保证等待t1线程对对象d加解锁执行完后在执行本线程
synchronized (TestBiased.class) {
try {
TestBiased.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug(ClassLayout.parseInstance(d).toPrintableSimple(true));
synchronized (d) {
log.debug(ClassLayout.parseInstance(d).toPrintableSimple(true));
}
log.debug(ClassLayout.parseInstance(d).toPrintableSimple(true));
}, "t2");
t2.start();
}
输出:
我们可以发现,线程t1首先对对象d进行加锁,加锁后为偏向锁,解锁后储存线程id还是保持偏向锁状态。然后线程t2再对d对象进行加锁,此时发现偏向锁已撤销,转变为轻量级锁,解锁后转变为无锁状态
[t1] - 00000000 00000000 00000000 00000000 00011111 01000001 00010000 00000101
[t2] - 00000000 00000000 00000000 00000000 00011111 01000001 00010000 00000101
[t2] - 00000000 00000000 00000000 00000000 00011111 10110101 11110000 01000000
[t2] - 00000000 00000000 00000000 00000000 00000000 00000000 00000000 00000001
4)撤销--调用wait/notify
当线程t1对对象d加锁后使用wait阻塞,然后释放锁。此时t2线程再对该对象d加锁,然后唤醒线程t1继续执行。
输出:
加锁前锁对象为偏向锁状态。首先t1线程对锁对象d加锁,这时还是偏向锁,然后调用wait进入阻塞状态释放锁。然后t2线程对锁对象d加锁,然后调用notify唤醒线程t1继续执行,此时t1线程执行完后,发现此时已经变为重量级锁状态
[t1] - 00000000 00000000 00000000 00000000 00000000 00000000 00000000 00000101
[t1] - 00000000 00000000 00000000 00000000 00011111 10110011 11111000 00000101
[t2] - notify
[t1] - 00000000 00000000 00000000 00000000 00011100 11010100 00001101 11001010
5)批量重定向(无竞争)---重新偏向线程id
如果对象虽然被多个线程访问,但没有竞争,这时偏向了线程 T1 的对象仍有机会重新偏向 T2,重偏向会重置对象 的 Thread ID。
例如
线程t1分别对30个锁对象进行加锁和解锁后(没有其他线程竞争),此时的对象仍然为偏向锁,线程id偏向t1线程。
线程t2分别再对这30个锁对象进行加锁和解锁操作,此时前19个锁对象t2线程对其加锁后,锁对象撤销为轻量级锁,解锁后锁对象为无锁状态。 从第20个对象开始,线程t2对其锁对象进行加锁的时候,锁对象还是偏向锁,此时线程id偏向线程t2,解锁后还是为偏向锁,保持该偏向锁状态(线程id偏向线程t2)。
注意: 这里的重定向是批量操作。 默认的批量重定向开始为第20次
6)批量撤销(无竞争)---撤销该类的所有偏向锁对象
当撤销偏向锁阈值为 40 次时,jvm 会这样觉得,自己确实偏向错了,根本就不该偏向。于是整个类的所有对象 都会变为不可偏向的,新建的对象也是不可偏向的
例如
- 线程t1对39个锁对象进行加解锁操作。此时的锁对象都为偏向锁,线程id偏向线程t1
- 线程t2再对39个锁对象进行加解锁操作,此时前19个对象加锁后撤销偏向锁为轻量级锁,解锁后锁对象为无锁状态。从第20个对象开始,开始将后面的锁对象批量重定向为指向t2线程的偏向锁。
- 线程t3再次对39个锁对象进行加解锁操作,此时的前19个对象都是无锁状态,加锁后变为轻量级锁,解锁后变为无锁状态。注意:从第20个对象开始,此时的锁对象都是指向t2线程的偏向锁 ,t2线程开始将第20个和后面的锁对象加锁后批量撤销为轻量级锁,解锁后为无锁状态。此时所有39对象都是无锁状态。
- 此时当第40个对象创建时,此时新创建的对象为无锁状态的对象,而不是偏向锁。
⑧偏向所锁,轻量级锁及重量级锁总结
- 偏向锁,轻量级锁都是乐观锁,他没有线程的竞争。一旦发生竞争则晋升为重量级锁,也叫悲观锁。
- 偏向锁默认是开启的,也就是说当对象一开始创建的时候,他就是偏向锁的状态(但是没有线程id的指向),当一个线程对此对象进行加锁的时候,该对象持有该线程的偏向锁(线程id指向该线程),解锁后保持该偏向锁的状态。当另一个线程对此对象进行加锁的时候,此时会撤销此偏向锁,转换为轻量级锁的状态,解锁后转为无锁状态。偏向锁和轻量级锁的只要发生竞争都会变为重量级锁。
⑨其他优化
1)减少上锁时间
同步代码块中尽量短
2) 减少锁的粒度
将一个锁拆分为多个锁提高并发度
- ConcurrentHashMap
- LongAddr
- LinkedBlockingQueue 入队和出队使用不同的锁,相对于LinkedBlockingArray只有一个锁效率要 高
3)锁粗化
多次循环进入同步块不如同步块内多次循环 另外 JVM 可能会做如下优化,把多次 append 的加锁操作 粗化为一次(因为都是对同一个对象加锁,没必要重入多次)
new StringBuffer().append("a").append("b").append("c");
4)锁消除
JVM 会进行代码的逃逸分析,例如某个加锁对象是方法内局部变量,不会被其它线程所访问到,这时候 就会被即时编译器(JIT)忽略掉所有同步操作。
5) 读写分离
CopyOnWriteArrayList
ConyOnWriteSet
10、wait、notify
①为什么需要wait
②wait原理
- Owner 线程发现条件(即该线程已经获得了锁)不满足,调用 wait 方法,即可进入 WaitSet 变为 WAITING 状态
- BLOCKED 和 WAITING 的线程都处于阻塞状态,不占用 CPU 时间片
- BLOCKED 线程会在 Owner 线程释放锁时唤醒
- WAITING 线程会在 Owner 线程调用 notify 或 notifyAll 时唤醒,但唤醒后并不意味者立刻获得锁,仍需进入 EntryList 重新竞争
EntryList锁池和WaitSet等待池区别?
对于java虚拟机中,运行程序的每一个对象来说,都有两个池,锁池EntryList、等待池WaitSet,而这两个吃又与Object基类的wait,notify,notifyAll三个方法,以及synchronized相关。
- 锁池EntryList,假设线程A已经拥有了某个对象(不是类)的锁,而其它线程B、C想要调用这个对象的某个synchronized方法(或者块),由于B、C线程在进入对象的synchronized方法(或者块)之前必须先获得该对象锁的拥有权,而恰巧该对象的锁目前正被线程A所占用,此时B、C线程就会被阻塞,进入一个地方去等待锁的释放,这个地方便是该对象的锁池,就是将B、C加入到锁池里面。
- 等待池WaitSet,假设线程A调用了某个对象的wait方法,线程A就会释放该对象的锁,同时线程A就进入到了该对象的等待池中,进入该等待池中的线程不会去竞争该对象的锁,如果线程B执行完之后调用了notify和notifyall方法的话,则处于该对象等待池中的被唤醒的线程A就会进入到该对象lock锁池中,锁池中的对象就会竞争该对象的锁,如果线程B执行完之后,就会将锁自动的释放掉,因此线程A就获得到了锁,但是真实的开发中,多个线程去竞争这个锁,优先级高的线程竞争到这个锁的机率更大,假如某个线程没有竞争到该对象锁,它只会留在锁池中,并不会重新进入到等待池中,而竞争到该对象锁的线程继续向下执行业务逻辑,直到执行完了synchronized方法(或者块)或者遇到了异常才会释放掉该对象锁,这时,锁池中的线程会继续竞争该对象锁。
③API介绍
调用wait方法,会取消偏量锁撤销,升级为重量锁
WAITING 线程会在 Owner 线程调用 notify 或 notifyAll 时唤醒,但唤醒后并不意味者立刻获得锁,仍需进入 EntryList 重新竞争
obj.wait() 让进入 object 监视器的线程到 waitSet 等待
obj.notify() 在 object 上正在 waitSet 等待的线程中挑一个唤醒
obj.notifyAll() 让 object 上正在 waitSet 等待的线程全部唤醒
它们都是线程之间进行协作的手段,都属于 Object 对象的方法。必须获得此对象的锁,才能调用这几个方法
即必须为锁的这个对象才能调用wait/notify。即synchronized(lock)中的 lock对象调用wait/notify
注意:没有获得锁的线程执行wait/notify方法会报错
public class T1 { static final Object object=new Object(); public static void main(String[] args) { try { object.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } }
Exception in thread "main" java.lang.IllegalMonitorStateException at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:502) at org.lc.wait_notify.T1.main(T1.java:14)
正确的姿势:
public class T1 { static final Object object=new Object(); public static void main(String[] args) { try { synchronized (object) { object.wait(); } } catch (InterruptedException e) { e.printStackTrace(); } } }
wait(),notify(),notifyAll()
实例:
@Slf4j
public class T1 {
static final Object object=new Object();
public static void main(String[] args) throws InterruptedException {
new Thread(()->{
synchronized (object) {
log.info("开始执行...");
try {
//线程进入waiting阻塞
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("其他代码...");
}
},"t1").start();
new Thread(()->{
synchronized (object) {
log.info("开始执行...");
try {
//线程进入waiting阻塞
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("其他代码...");
}
},"t2").start();
// 先让其他线程执行
Thread.sleep(2000);
synchronized (object) {
// 唤醒一个线程(唤醒的线程竞争)
// object.notify();
// 唤醒所有waiting线程
object.notifyAll();
}
}
}
11:28:06.797 [t1] INFO org.lc.wait_notify.T1 - 开始执行...
11:28:06.799 [t2] INFO org.lc.wait_notify.T1 - 开始执行...
11:28:08.795 [t2] INFO org.lc.wait_notify.T1 - 其他代码...
11:28:08.795 [t1] INFO org.lc.wait_notify.T1 - 其他代码...
wait(long n)
有时限的等待, 到 n 毫秒后结束等待,重新争抢锁。或是被 notify/notifyAll
@Slf4j
public class T2 {
static final Object object=new Object();
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
synchronized (object) {
log.info("开始执行...");
try {
//线程进入waiting阻塞
object.wait(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("其他代码...");
}
}, "t1").start();
}
}
11:32:31.444 [t1] INFO org.lc.wait_notify.T2 - 开始执行...
11:32:34.447 [t1] INFO org.lc.wait_notify.T2 - 其他代码...
3秒后自动被唤醒
④wait(long n)和sleep(long n)的区别
sleep 是 Thread 方法,而 wait 是 Object 的方法
sleep 不需要强制和 synchronized 配合使用,但 wait 需要 和 synchronized 一起用
sleep 在睡眠的同时,不会释放对象锁的,但 wait 在等待的时候会释放对象锁
它们 状态都是 TIMED_WAITING
⑤唤醒占用同一锁的其他指定线程-实例
**注意:**这里不能用sleep,因为sleep不会释放锁,而要用wiat,wait会释放锁
用 notifyAll 仅解决某个线程的唤醒问题,但使用 if + wait 判断仅有一次机会,一旦条件不成立,就没有重新 判断的机会了
解决方法,用 while + wait,当条件不成立,再次 wait
**虚假唤醒:**notify 只能随机唤醒一个 WaitSet 中的线程,这时如果有其它线程也在等待,那么就可能唤醒不了正确的线 程,称之为【虚假唤醒】 解决方法,改为 notifyAll
@Slf4j
public class T3 {
// 锁
final static Object room = new Object();
// 是否有烟
static boolean hasSomke = false;
// 是否有外卖
static boolean hasLunch = false;
public static void main(String[] args) throws InterruptedException {
new Thread(()->{
synchronized (room) {
log.info("有烟没?{}",hasSomke);
while (!hasSomke) {
log.info("先休息一下...");
try {
// 若错误被唤醒,那么会重新判断条件
room.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.info("有烟没?{}",hasSomke);
if (hasSomke) {
log.info("开始干活...");
}
}
},"烟枪王").start();
new Thread(()->{
synchronized (room) {
log.info("午饭到没?{}",hasLunch);
if (!hasLunch) {
log.info("做等饭来...");
try {
room.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.info("午饭到没?{}", hasLunch);
if (hasLunch) {
log.info("开吃...");
}
}
},"大胃王").start();
// 先执行其他线程
Thread.sleep(2000);
synchronized (room) {
// 预先送饭
hasLunch=true;
log.info("送饭了...");
// 当我们使用notity()时,这是可能会唤醒送烟线程,导致虚假唤醒
// notify 只能随机唤醒一个 WaitSet 中的线程
// room.notify();
room.notifyAll();
}
}
}
12:36:46.600 [烟枪王] INFO org.lc.wait_notify.T3 - 有烟没?false
12:36:46.605 [烟枪王] INFO org.lc.wait_notify.T3 - 先休息一下...
12:36:46.605 [大胃王] INFO org.lc.wait_notify.T3 - 午饭到没?false
12:36:46.605 [大胃王] INFO org.lc.wait_notify.T3 - 做等饭来...
12:36:48.600 [main] INFO org.lc.wait_notify.T3 - 送饭了...
12:36:48.600 [大胃王] INFO org.lc.wait_notify.T3 - 午饭到没?true
12:36:48.600 [大胃王] INFO org.lc.wait_notify.T3 - 开吃...
12:36:48.600 [烟枪王] INFO org.lc.wait_notify.T3 - 先休息一下...
11、同步模式之保护性暂停
①.定义
即 Guarded Suspension,用在一个线程等待另一个线程的执行结果 要点
有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
JDK 中,join 的实现、Future 的实现,采用的就是此模式
因为要等待另一方的结果,因此归类到同步模式
②应用- 超时效果
//下载返回读取的网页
public class Downloader {
public static List<String> download() throws IOException {
HttpsURLConnection connection= (HttpsURLConnection) new URL("https://www.baidu.com/").openConnection();
List<String> list=new ArrayList<>();
try(BufferedReader reader=new BufferedReader(new InputStreamReader(connection.getInputStream(), StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
list.add(line);
}
}
return list;
}
}
@Slf4j
public class T4 {
public static void main(String[] args) {
// 线程1等待线程2的下载结果
GuardedObject guardedObject=new GuardedObject();
new Thread(()->{
// 等待结果
log.info("等待结果...");
// 设置超时时间
List<String> o = (List<String>) guardedObject.get(2000);
log.info("下载的内容:{}", o==null?"已超时...":o.toString());
},"t1").start();
new Thread(()->{
log.info("执行下载");
try {
// 下载
List<String> download = Downloader.download();
// 传入下载的结果
// 这时会唤醒阻塞的线程
guardedObject.complete(download);
} catch (IOException e) {
e.printStackTrace();
}
},"t2").start();
}
}
class GuardedObject{
// 结果
private Object response;
/**
* 获取值
* @param timeout 超时的时间
* @return 返回的结果
*/
public Object get(long timeout) {
synchronized (this){
// 开始时间
long start=System.currentTimeMillis();
// 经历的时间
long passedTime=0;
// 是否得到结果
while (response==null) {
// 这一轮循环应该等待的时间
long waitTime=timeout-passedTime;
// 如果经历时间超过 预设的超时时间
if (waitTime<=0) {
// 直接退出
break;
}
try {
// 防止虚假唤醒
this.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 求得经历的时间
passedTime=System.currentTimeMillis()-start;
}
// 不为空 返回此结果
return response;
}
}
// 产生结果
public void complete(Object response) {
synchronized (this) {
this.response=response;
// 唤醒等待结果的线程
this.notifyAll();
}
}
}
设置超时时间:2000(2s)
guardedObject.get(2000);
15:48:00.528 [t1] INFO org.lc.wait_notify.T4 - 等待结果...
15:48:00.528 [t2] INFO org.lc.wait_notify.T4 - 执行下载
15:48:02.081 [t1] INFO org.lc.wait_notify.T4 - 下载的内容:[<!DOCTYPE html>, <!--STATUS OK--><html> <head><meta http-equiv=content-type content=text/html;charset=utf-8><meta http-..
设置超时时间:1000(1s)
guardedObject.get(1000);
15:53:50.537 [t2] INFO org.lc.wait_notify.T4 - 执行下载
15:53:50.537 [t1] INFO org.lc.wait_notify.T4 - 等待结果...
15:53:51.541 [t1] INFO org.lc.wait_notify.T4 - 下载的内容:已超时...
③应用 - join原理
Thread t1=new Thread(()->{});
t1.join(2000);
源码如下:
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
//如果没有传入过期时间
if (millis == 0) {
//轮询 查询该t1线程是否消亡
while (isAlive()) {
//主线程一直阻塞
wait(0);
}
} else {
//调用者线程进入t1的waitSet等待,直到t1运行结束
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
//休眠指定的实现
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
join方法的本质调用的是Object中的wait方法实现线程的阻塞,但是我们需要知道的是,调用wait方法必须要获取锁,所以join方法是被synchronized修饰的,synchronized修饰在方法层面相当于synchronized(this),this就是t1本身的实例。 实际上主线程会持有t1这个对象的锁(即把当前线程对象当锁),然后调用wait方法去阻塞,而这个方法的调用者是在主线程中的。所以造成主线程阻塞。
④应用- 多任务版 GuardedObject
12、异步模式之生产者/消费者
①定义
要点
- 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
- 消费队列可以用来平衡生产和消费的线程资源
- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
- 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
- JDK 中各种阻塞队列,采用的就是这种模式
②实例
- 定义消息
//定义消息
final class Message{
private int id;
private Object value;
public Message(int id, Object value) {
this.id = id;
this.value = value;
}
public int getId() {
return id;
}
public Object getValue() {
return value;
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", value=" + value +
'}';
}
}
- 消息队列
@Slf4j
class MessageQueue{
// 消息的队列集合
//双向队列
private LinkedList<Message> list=new LinkedList<>();
//队列容量
private int capcity;
public MessageQueue(int capcity) {
this.capcity = capcity;
}
//获取消息
public Message take(){
synchronized (list) {
// 判断队列是否为空
while (list.isEmpty()) {
try {
log.info("队列为空,消费者线程等待...");
// 为空一直等待
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 从队列的头部取
Message message = list.removeFirst();
log.info("已消费消息{}",message);
// 唤醒存入消息的线程
list.notifyAll();
return message;
}
}
//存入消息
public void put(Message message) {
synchronized (list) {
// 检查队列是否已满
while (list.size() == capcity) {
try {
log.info("队列已满,生产者等待...");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 从队列的尾部加
list.addLast(message);
log.info("已生产消息{}",message);
// 唤醒取出消息的线程
list.notifyAll();
}
}
}
- 测试
public class T6 {
public static void main(String[] args) throws InterruptedException {
MessageQueue queue=new MessageQueue(2);
for (int i = 0; i < 3; i++) {
int id=i+1;
new Thread(()->{
queue.put(new Message(id,"值:"+id ));
},"生产者:"+id).start();
}
Thread.sleep(1000);
new Thread(()->{
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
queue.take();
}
},"消费者").start();
}
}
20:52:02.285 [生产者:1] INFO org.lc.wait_notify.MessageQueue - 已生产消息Message{id=1, value=值:1}
20:52:02.290 [生产者:3] INFO org.lc.wait_notify.MessageQueue - 已生产消息Message{id=3, value=值:3}
20:52:02.290 [生产者:2] INFO org.lc.wait_notify.MessageQueue - 队列已满,生产者等待...
20:52:04.285 [消费者] INFO org.lc.wait_notify.MessageQueue - 已消费消息Message{id=1, value=值:1}
20:52:04.285 [生产者:2] INFO org.lc.wait_notify.MessageQueue - 已生产消息Message{id=2, value=值:2}
20:52:05.286 [消费者] INFO org.lc.wait_notify.MessageQueue - 已消费消息Message{id=3, value=值:3}
20:52:06.287 [消费者] INFO org.lc.wait_notify.MessageQueue - 已消费消息Message{id=2, value=值:2}
20:52:07.287 [消费者] INFO org.lc.wait_notify.MessageQueue - 队列为空,消费者线程等待...
13、park和unpark
①基本使用
它们是 LockSupport 类中的方法
// 暂停当前线程
LockSupport.park();
// 恢复某个线程的运行
LockSupport.unpark(暂停线程对象)
- 先 park 后 unpark
@Slf4j
public class T1 {
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
log.info("start...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("park...");
LockSupport.park();
log.info("resume...");
}, "t1");
t1.start();
Thread.sleep(2000);
log.info("unpark");
LockSupport.unpark(t1);
}
}
21:09:20.417 [t1] INFO org.lc.park_unpark.T1 - start...
21:09:21.421 [t1] INFO org.lc.park_unpark.T1 - park...
21:09:22.415 [main] INFO org.lc.park_unpark.T1 - unpark
21:09:22.415 [t1] INFO org.lc.park_unpark.T1 - resume...
- 先 unpark 后 park 。也能唤醒park进程
@Slf4j
public class T1 {
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
log.info("start...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("park...");
LockSupport.park();
log.info("resume...");
}, "t1");
t1.start();
Thread.sleep(1000);
log.info("unpark");
LockSupport.unpark(t1);
}
}
21:11:05.473 [t1] INFO org.lc.park_unpark.T1 - start...
21:11:06.473 [main] INFO org.lc.park_unpark.T1 - unpark
21:11:07.476 [t1] INFO org.lc.park_unpark.T1 - park...
21:11:07.476 [t1] INFO org.lc.park_unpark.T1 - resume...
②基本原理
每个线程都有自己的一个 Parker 对象,由三部分组成 _counter
, _cond
和 _mutex
打个比喻
线程就像一个旅人,Parker 就像他随身携带的背包,条件变量就好比背包中的帐篷。_counter 就好比背包中 的备用干粮(0 为耗尽,1 为充足)
调用 park 就是要看需不需要停下来歇息
- 如果备用干粮耗尽,那么钻进帐篷歇息
- 如果备用干粮充足,那么不需停留,继续前进
调用 unpark,就好比令干粮充足
- 如果这时线程还在帐篷,就唤醒让他继续前进
- 如果这时线程还在运行,那么下次他调用 park 时,仅是消耗掉备用干粮,不需停留继续前进
- 因为背包空间有限,多次调用 unpark 仅会补充一份备用干粮
③与Object的wait¬ify相比
- wait,notify 和 notifyAll 必须配合 Object Monitor 一起使用,而 park,unpark 不必
- park & unpark 是以线程为单位来【阻塞】和【唤醒】线程,而 notify 只能随机唤醒一个等待线程,notifyAll 是唤醒所有等待线程,就不那么【精确】
- park & unpark 可以先 unpark,而 wait & notify 不能先 notify
14、活跃性
①多把不相干的锁
将锁的粒度细分
- 好处,是可以增强并发度
- 坏处,如果一个线程需要同时获得多把锁,就容易发生死锁
共用一把锁this:
public class T1 { public static void main(String[] args) { Room room=new Room(); new Thread(()->{ try { room.sleep(); } catch (InterruptedException e) { e.printStackTrace(); } },"t1").start(); new Thread(()->{ try { room.study(); } catch (InterruptedException e) { e.printStackTrace(); } },"t2").start(); } } @Slf4j class Room{ public void sleep() throws InterruptedException { synchronized (this) { log.info("sleep..."); Thread.sleep(2000); } } public void study() throws InterruptedException { synchronized (this) { log.info("study..."); Thread.sleep(2000); } } }
20:03:29.176 [t2] INFO org.lc.deathLock.Room - study... 20:03:31.176 [t1] INFO org.lc.deathLock.Room - sleep...
我们发现始终是顺序执行的,即是互斥的。但是两个线程业务并不相干。所有我们可以使用不同的锁。
多个锁:
修改Room类
@Slf4j class Room{ public final static Object sleepRoom=new Object(); public final static Object studyRoom=new Object(); public void sleep() throws InterruptedException { synchronized (sleepRoom) { log.info("sleep..."); Thread.sleep(2000); } } public void study() throws InterruptedException { synchronized (studyRoom) { log.info("study..."); Thread.sleep(2000); } } }
20:03:29.176 [t2] INFO org.lc.deathLock.Room - study... 20:03:29.176 [t1] INFO org.lc.deathLock.Room - sleep...
②死锁
一个线程需要同时获取多把锁,这时就容易发生死锁
t1 线程 获得 o1对象 锁,接下来想获取 o2对象 的锁
t2 线程 获得 o2对象 锁,接下来想获取 o1对象 的锁
@Slf4j
public class T2 {
static Object o1=new Object();
static Object o2=new Object();
public static void main(String[] args) {
new Thread(()->{
synchronized (o1){
log.info("get lock o1");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (o2){
log.info("get lock o2");
}
}
},"t1").start();
new Thread(()->{
synchronized (o2){
log.info("get lock o2");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (o1){
log.info("get lock o1");
}
}
},"t2").start();
}
}
20:08:51.708 [t2] INFO org.lc.deathLock.T2 - get lock o2
20:08:51.708 [t1] INFO org.lc.deathLock.T2 - get lock o1
//程序未终止..
③活锁
活锁出现在两个线程互相改变对方的结束条件,后谁也无法结束
@Slf4j
public class T3 {
static int count=10;
public static void main(String[] args) {
new Thread(()->{
//期望到0就退出
while (count>0) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("count:{}", count);
count--;
}
},"t1").start();
new Thread(()->{
//期望到20就退出循环
while (count<20){
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("count:{}",count);
count++;
}
},"t2").start();
}
}
20:17:59.724 [t1] INFO org.lc.deathLock.T3 - count:11
20:17:59.811 [t2] INFO org.lc.deathLock.T3 - count:10
20:17:59.825 [t1] INFO org.lc.deathLock.T3 - count:11
//一值循环的增减
④饥饿
很多教程中把饥饿定义为,一个线程由于优先级太低,始终得不到 CPU 调度执行,也不能够结束,饥饿的情况不 易演示,讲读写锁时会涉及饥饿问题
看使用顺序加锁的方式解决之前的死锁问题
顺序加锁的解决方案
15、ReentrantLock(可重入锁)
相对于 synchronized
它具备如下特点
- 可中断
- 在阻塞中没有获得锁的线程可中断阻塞。
- 可以设置超时时间
- 没有获得锁的线程,最多等待的时间则会放弃获得锁
- 可以设置为公平锁
- 支持多个条件变量
与 synchronized 一样,都支持可重入 (轻量级锁)
基本语法:
ReentrantLock reentrantLock=new ReentrantLock();
// 获得锁
reentrantLock.lock();
try {
//临界区...
}finally {
//释放锁
reentrantLock.unlock();
}
①可重入
可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁
如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住
@Slf4j
public class T1 {
static ReentrantLock lock=new ReentrantLock();
public static void main(String[] args) {
m1();
}
static void m1() {
lock.lock();
try {
log.info("m1...");
m2();
}finally {
lock.unlock();
}
}
static void m2() {
lock.lock();
try {
log.info("m2...");
m3();
}finally {
lock.unlock();
}
}
static void m3() {
lock.lock();
try {
log.info("m3...");
}finally {
lock.unlock();
}
}
}
12:00:10.170 [main] INFO org.lc.Reentrant_Lock.T2 - m1...
12:00:10.173 [main] INFO org.lc.Reentrant_Lock.T2 - m2...
12:00:10.173 [main] INFO org.lc.Reentrant_Lock.T2 - m3...
②可打断
在阻塞中等待锁的线程被打断,即结束等待
lock.lockInterruptibly()
可打断获得锁的模式
可中断模式:
@Slf4j
public class T3 {
private static ReentrantLock lock=new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread t1=new Thread(()->{
try {
//如果没有竞争那么此方法就会获取lock锁
//如果有竞争就进入阻塞队列,可以被其他线程用interrupt打断
log.info("尝试获取锁...");
lock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
log.info("没有获得到锁...返回");
//这里我们被打断,后面的代码也就无需执行了
return;
}
try {
log.info("获取到锁...");
}finally {
lock.unlock();
}
},"t1");
//主线程先获取到锁
lock.lock();
t1.start();
Thread.sleep(1000);
//打断t1线程
t1.interrupt();
}
}
12:23:27.170 [t1] INFO org.lc.Reentrant_Lock.T3 - 尝试获取锁...
java.lang.InterruptedException
at ....
12:23:28.168 [t1] INFO org.lc.Reentrant_Lock.T3 - 没有获得到锁...返回
不可中断模式,那么即使使用了 interrupt 也不会让等待中断
@Slf4j
public class T3 {
private static ReentrantLock lock=new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread t1=new Thread(()->{
try {
//如果没有竞争那么此方法就会获取lock锁
//如果有竞争就进入阻塞队列,可以被其他线程用interrupt打断
log.info("尝试获取锁...");
lock.lock();
} finally {
lock.unlock();
}
},"t1");
//主线程先获取到锁
lock.lock();
t1.start();
Thread.sleep(1000);
log.info("打断...");
t1.interrupt();
}
}
12:34:20.364 [t1] INFO org.lc.Reentrant_Lock.T3 - 尝试获取锁...
12:34:21.361 [main] INFO org.lc.Reentrant_Lock.T3 - 打断...
//阻塞的线程并没有被打断。
③锁超时
如果没有获得到锁,立刻失败返回
tryLock()
获取到锁返回true,否则返回false
@Slf4j
public class T4 {
private static ReentrantLock lock=new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread t1=new Thread(()->{
log.info("尝试获取锁...");
if(!lock.tryLock()){
//没有获得锁 返回false
log.info("没有获得到锁...");
return;
}
try {
log.info("获得了锁...");
}
finally {
lock.unlock();
}
},"t1");
//主线程先获取到锁
lock.lock();
t1.start();
}
}
12:43:47.121 [t1] INFO org.lc.Reentrant_Lock.T4 - 尝试获取锁...
12:43:47.123 [t1] INFO org.lc.Reentrant_Lock.T4 - 没有获得到锁...
在指定的时间内等待锁,等待的时候可被打断。
tryLock(long timeout, TimeUnit unit)
@Slf4j
public class T4 {
private static ReentrantLock lock=new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread t1=new Thread(()->{
try {
log.info("尝试获取锁...");
//最多等待3秒,没有获得到锁返回false
if (!lock.tryLock(3, TimeUnit.SECONDS)) {
//没有获得锁 返回false
log.info("没有获得到锁...");
//直接退出
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
log.info("被打断...没有获得到锁");
// 打断后也应该返回
return;
}
try {
log.info("获得了锁...");
}
finally {
lock.unlock();
}
},"t1");
//主线程先获取到锁
lock.lock();
t1.start();
Thread.sleep(1000);
//主线程获得锁1s后打断在阻塞等待的t1线程
t1.interrupt();
}
}
12:53:08.668 [t1] INFO org.lc.Reentrant_Lock.T4 - 尝试获取锁...
java.lang.InterruptedException
at ....
12:53:09.666 [t1] INFO org.lc.Reentrant_Lock.T4 - 被打断...没有获得到锁
④公平锁
即在WaitSet中阻塞的线程争抢锁的时候是非公平的,即随机争抢。
而公平锁,阻塞线程在争抢锁的时候是按照阻塞队列的顺序来的。
ReentrantLock默认是不公平的
//设置公平锁
ReentrantLock lock = new ReentrantLock(true);
公平锁一般没有必要,会降低并发度。
⑤条件变量
synchronized 中也有条件变量,就是我们讲原理时那个 waitSet 休息室,当条件不满足时进入 waitSet 等待
ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的,这就好比
- synchronized 是那些不满足条件的线程都在一间休息室等消息
- 而 ReentrantLock 支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤 醒,即阻塞队列的存放位置可以分组,
使用要点(和synchronized类似,只是阻塞队列做了区分,然后需要手动释放锁)
- await 前需要获得锁
- await 执行后,会释放锁,进入 conditionObject 等待
- await 的线程被唤醒(signal)(或打断、或超时)取重新竞争 lock 锁
- 竞争 lock 锁成功后,从 await 后继续执行
@Slf4j
public class T6 {
// 是否有烟
static boolean hasSmoke=false;
// 是否有饭
static boolean hasLunch=false;
// 可重入锁
static ReentrantLock lock=new ReentrantLock();
// 抽烟的队列
static Condition somekRoom=lock.newCondition();
// 吃饭的队列
static Condition lunchRoom=lock.newCondition();
public static void main(String[] args) throws InterruptedException {
new Thread(()->{
// 尝试获得锁
lock.lock();
try {
log.info("有烟没?{}",hasSmoke);
while (!hasSmoke) {
try {
log.info("烟没到,休息下...");
//没烟。等烟
somekRoom.await();
} catch (InterruptedException e) {
//可打断
e.printStackTrace();
}
}
//有烟
log.info("有烟,开始干活!");
}finally {
// 释放锁
lock.unlock();
}
},"烟杆子").start();
new Thread(()->{
// 尝试获得锁
lock.lock();
try {
log.info("有饭没?{}",hasLunch);
while (!hasLunch) {
try {
log.info("没饭,睡一会...");
lunchRoom.await();
} catch (InterruptedException e) {
// 可被打断
e.printStackTrace();
}
}
log.info("饭来了,恰饭!");
}finally {
// 释放锁
lock.unlock();
}
},"饭桶").start();
new Thread(()->{
// 获取锁
lock.lock();
try {
// 送烟
hasSmoke=true;
// 唤醒阻塞的烟杆子
// 这里signal()只会唤醒somkeRoom中的一个
somekRoom.signal();
}finally {
lock.unlock();
}
},"工具人1").start();
Thread.sleep(2000);
new Thread(()->{
// 获得锁
lock.lock();
try {
// 送饭
hasLunch=true;
// 关系lunchRoom中等待的队列
lunchRoom.signal();
}finally {
lock.unlock();
}
},"工具人2").start();
}
}
16:27:44.550 [烟杆子] INFO org.lc.Reentrant_Lock.T6 - 有烟没?false
16:27:44.554 [烟杆子] INFO org.lc.Reentrant_Lock.T6 - 烟没到,休息下...
16:27:44.554 [饭桶] INFO org.lc.Reentrant_Lock.T6 - 有饭没?false
16:27:44.554 [饭桶] INFO org.lc.Reentrant_Lock.T6 - 没饭,睡一会...
16:27:44.554 [烟杆子] INFO org.lc.Reentrant_Lock.T6 - 有烟,开始干活!
16:27:46.548 [饭桶] INFO org.lc.Reentrant_Lock.T6 - 饭来了,恰饭!
16、设计模式-同步模式之顺序控制
①固定运行顺序
题目:必须先打印2后打印1
1、使用wait/notify
@Slf4j
public class T1 {
static Object lock=new Object();
static boolean isRun=false;
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
synchronized (lock) {
while (!isRun) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.info("RunWith 1...");
}
}, "t1");
Thread t2=new Thread(()->{
synchronized (lock) {
log.info("RunWith 2...");
if(!isRun){
isRun=true;
lock.notify();
}
}
},"t2");
t1.start();
t2.start();
}
}
16:48:20.774 [t2] INFO org.lc.desiger_.T1 - RunWith 2...
16:48:20.776 [t1] INFO org.lc.desiger_.T1 - RunWith 1...
2、使用ReentrantLock
@Slf4j
public class T2 {
static ReentrantLock lock=new ReentrantLock();
static Condition waitSet=lock.newCondition();
static boolean isRun=false;
public static void main(String[] args) {
new Thread(()->{
lock.lock();
try {
while (!isRun){
try {
waitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.info("runwith 1");
}finally {
lock.unlock();
}
},"t1").start();
new Thread(()->{
lock.lock();
try {
if (!isRun) {
isRun=true;
log.info("runwith 2");
waitSet.signal();
}
}finally {
lock.unlock();
}
},"t1").start();
}
}
16:59:17.669 [t1] INFO org.lc.desiger_.T2 - runwith 2
16:59:17.671 [t1] INFO org.lc.desiger_.T2 - runwith 1
3、使用park和unpark
@Slf4j
public class T3 {
public static void main(String[] args) {
Thread t1= new Thread(() -> {
LockSupport.park();
log.info("runwith 1");
}, "t1");
t1.start();
new Thread(()->{
log.info("runwith 2");
LockSupport.unpark(t1);
},"t2").start();
}
}
②交替输出
题目:线程 1 输出 a 5 次,线程 2 输出 b 5 次,线程 3 输出 c 5 次。
现在要求输出 abcabcabcabcabc 怎么实现
1、使用wait/notify
public class T4 {
public static void main(String[] args) {
// 下一定义标记为1 即先打印a
PrintABC printABC = new PrintABC(1, 5);
new Thread(()->{
printABC.print("a", 1, 2);
},"t1").start();
new Thread(()->{
printABC.print("b", 2, 3);
},"t2").start();
new Thread(()->{
printABC.print("c", 3, 1);
},"t3").start();
}
}
/**依次打印abc五次
* 当前打印标记 下一打印标记
* a 1 2
* b 2 3
* c 3 1
*/
class PrintABC{
// 当前打印标记
int flag;
// 下一打印标记
int nextFlag;
// 循环次数
int loopNumber;
public PrintABC(int nextFlag, int loopNumber) {
this.nextFlag = nextFlag;
this.loopNumber = loopNumber;
}
public void print(String word, int flag, int nextFlag){
for (int i = 0; i < loopNumber; i++) {
synchronized (this) {
// 如果当前传过来的标记和下一打印标记不一致
while (this.nextFlag!=flag) {
try {
// 等待
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.print(word);
this.nextFlag=nextFlag;
// 唤醒所有进程
this.notifyAll();
}
}
}
}
abcabcabcabcabc
2、使用ReentrantLock
public class T5 {
public static void main(String[] args) throws InterruptedException {
ReentrantPrint reentrantPrint = new ReentrantPrint(5);
Condition a = reentrantPrint.newCondition();
Condition b = reentrantPrint.newCondition();
Condition c = reentrantPrint.newCondition();
new Thread(()->{
reentrantPrint.print("a", a, b);
},"t1").start();
new Thread(()->{
reentrantPrint.print("b", b, c);
},"t2").start();
new Thread(()->{
reentrantPrint.print("c", c, a);
},"t3").start();
//先让上述线程都进入各自的阻塞队列
Thread.sleep(2000);
// 主线程启动 先唤醒t1打印a
reentrantPrint.lock();
try {
// 唤醒a队列
a.signal();
System.out.println("start...");
}finally {
//释放锁
reentrantPrint.unlock();
}
}
}
class ReentrantPrint extends ReentrantLock {
// 循环次数
int loopNumber;
public ReentrantPrint(int loopNumber) {
this.loopNumber = loopNumber;
}
/**
* 打印的方法
* @param word 打印的内容
* @param cond 当前线程睡眠的队列
* @param nextCond 下一个唤醒的队列
*/
public void print(String word,Condition cond,Condition nextCond) {
for (int i = 0; i < loopNumber; i++) {
// 尝试获得锁
lock();
try {
try {
//a等待的队列
cond.await();
// 打印单词
System.out.print(word);
// 下一个唤醒的队列
nextCond.signal();
} catch (InterruptedException e) {
e.printStackTrace();
}
}finally {
unlock();
}
}
}
}
start...
abcabcabcabcabc
3、park和unpark
public class T6 {
static Thread t1;
static Thread t2;
static Thread t3;
public static void main(String[] args) throws InterruptedException {
ParkPrint parkPrint = new ParkPrint(5);
t1 = new Thread(() -> {
parkPrint.print("a", t2);
}, "t1");
t2 = new Thread(() -> {
parkPrint.print("b", t3);
}, "t2");
t3 = new Thread(() -> {
parkPrint.print("c", t1);
}, "t3");
t1.start();
t2.start();
t3.start();
// 保证上述线程启动完毕
Thread.sleep(2000);
// 先唤醒t1线程 打印a
LockSupport.unpark(t1);
}
}
class ParkPrint{
int loopNumber;
public ParkPrint(int loopNumber) {
this.loopNumber = loopNumber;
}
public void print(String word,Thread nextThread){
for (int i = 0; i < loopNumber; i++) {
// 先阻塞
LockSupport.park();
System.out.print(word);
LockSupport.unpark(nextThread);
}
}
}
abcabcabcabcabc
17、本章小结
Ⅳ、共享模型之内存
1、java内存模型(JMM)
JMM: java memory model 它定义了主存(线程共享的数据、静态成员变量,成员变量)、工作内存(局部变量)抽象概念,底层对应着 CPU 寄存器、缓存、硬件内存、 CPU 指令优化等。
JMM 体现在以下几个方面
- 原子性 - 保证指令不会受到线程上下文切换的影响
- 可见性 - 保证指令不会受 cpu 缓存的影响
- 有序性 - 保证指令不会受 cpu 指令并行优化的影响
2、可见性和有序性和原子性
1)可见性
①退不出的循环
先来看一个现象,按道理来说,主线程经过1秒后修改flag变量,线程t1终止,但是main 线程对 flag变量的修改对于 t1 线程不可见,导致了 t 1线程无法停止:
public class T1 {
static boolean flag = true;
public static void main(String[] args) throws InterruptedException {
new Thread(()->{
while (flag) {
}
},"t1").start();
Thread.sleep(1000);
flag=false;
}
}
原因:
初始状态t1线程开始从主存flag的值到工作内存中
因为t1线程要频繁的从主存中读取flag的值,
JIT
(just-in-time compilation)即时编译器会将flag的值缓存到自己的工作内存中的高速缓存中,减少对主存中flag的访问,以提高效率
1 秒之后,main 线程修改了 flag的值,并同步至主存,而 t1线程 是从自己工作内存中的高速缓存中读取这个变量 的值,结果永远是旧值
解决方法:
使用volatile关键字
加上**volatile(易变的)**关键字,只能修饰成员变量,不能修饰局部变量
volatile static boolean flag = true;
使用加synchronized锁的方式
@Slf4j
public class T1 {
static boolean flag = true;
static Object lock=new Object();
public static void main(String[] args) throws InterruptedException {
new Thread(()->{
while (true) {
synchronized (lock) {
if (!flag) {
break;
}
}
}
},"t1").start();
Thread.sleep(1000);
log.info("退出...");
flag=false;
}
}
2)有序性
JVM 会在不影响正确性的前提下,可以调整语句的执行顺序,思考下面一段代码
static int i;
static int j;
// 在某个线程内执行如下赋值操作 i = ...; j = ...;
可以看到,至于是先执行 i 还是 先执行 j ,对终的结果不会产生影响。所以,上面代码真正执行时,既可以是
i = ...;
j = ...;
也可以是
j = ...;
i = ...;
这种特性称之为『指令重排』,多线程下『指令重排』会影响正确性。
使用:volatile 修饰的变量,可以防止指令重排
即被volatile修饰的变量在修改之前的代码都是有序的(只是在本线程有效),不可被jvm进行优化,重新排列运行
3)原子性
volatile不能保证原子性
原子性:是指一个操作是不可中断的。即使是多个线程一起执行的时候,一个操作一旦开始,就不会被其他线程干扰。
比如,对于一个静态全局变量int i,两个线程同时对它赋值,线程A给他赋值为1,线程B给他赋值为-1。那么不管这两个线程
以何种方式。何种步调工作,i的值要么是1,要么是-1.线程A和线程B之间是没有干扰的。这就是原子性的一个特点,不可被中断。
3、synchronized和volatile区别
前面例子体现的实际就是可见性,它保证的是在多个线程之间,一个线程对 volatile 变量的修改对另一个线程可 见, 不能保证原子性,仅用在一个写线程,多个读线程的情况: 上例从字节码理解是这样的:
getstatic run // 线程 t 获取 run true
getstatic run // 线程 t 获取 run true
getstatic run // 线程 t 获取 run true
getstatic run // 线程 t 获取 run true
putstatic run // 线程 main 修改 run 为 false, 仅此一次
getstatic run // 线程 t 获取 run false
比较一下之前我们将线程安全时举的例子:两个线程一个 i++ 一个 i-- ,只能保证看到新值,不能解决指令交错
// 假设i的初始值为0
getstatic i // 线程2-获取静态变量i的值 线程内i=0
getstatic i // 线程1-获取静态变量i的值 线程内i=0
iconst_1 // 线程1-准备常量1
iadd // 线程1-自增 线程内i=1
putstatic i // 线程1-将修改后的值存入静态变量i 静态变量i=1
iconst_1 // 线程2-准备常量1
isub // 线程2-自减 线程内i=-1
putstatic i // 线程2-将修改后的值存入静态变量i 静态变量i=-1
synchronized和volatile区别:
- synchronized语句块既可以保证代码块的原子性,也同时保证代码块内变量的可见性,也可以保证有序性(但是需要将修改的变量全部交由synchronized管理)。但缺点是synchronize属于重量级的操作,性能相对较低。
- volatile并不能保证原子性,但是能够保证其有序性和可见性。他适用于一个线程读,另一个线程写的情况
4、终止模式之两阶段终止模式(volatile)
我们之前使用isInterrupted
来实现,现在我们使用volatile
实现监视器功能
public class T2 {
public static void main(String[] args) throws InterruptedException {
LogMonitor logMonitor=new LogMonitor();
logMonitor.start();
//5秒后停止监控
Thread.sleep(5000);
logMonitor.stop();
}
}
@Slf4j
class LogMonitor{
private Thread thread;
private volatile boolean flag=false;
public void start(){
thread=new Thread(()->{
while (true) {
if(flag){
log.info("停止监控...");
break;
}
try {
Thread.sleep(1000);
log.info("监控记录中...");
} catch (InterruptedException e) {
}
}
},"监视器");
thread.start();
}
public void stop() {
flag=true;
//打断线程 避免在睡眠后又一次记录
thread.interrupt();
}
}
19:50:16.992 [监视器] INFO org.lc.volatile_.LogMonitor - 监控记录中...
19:50:17.996 [监视器] INFO org.lc.volatile_.LogMonitor - 监控记录中...
19:50:18.997 [监视器] INFO org.lc.volatile_.LogMonitor - 监控记录中...
19:50:19.998 [监视器] INFO org.lc.volatile_.LogMonitor - 监控记录中...
19:50:20.991 [监视器] INFO org.lc.volatile_.LogMonitor - 停止监控...
5、同步模式之Balking(犹豫模式)
①定义:
Balking (犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做 了,直接结束返回
②监控日志-案例
防止创建多个启动实例 synchronized+volatile
synchronzed保证原子性,volatile保证可见性
public class T2 {
public static void main(String[] args) throws InterruptedException {
LogMonitor logMonitor=new LogMonitor();
// 这里启动多个实例 还是按照每秒执行一次来监控
logMonitor.start();
logMonitor.start();
logMonitor.start();
//5秒后停止监控
// Thread.sleep(5000);
// logMonitor.stop();
}
}
@Slf4j
class LogMonitor{
private Thread thread;
private volatile boolean flag=false;
// 判断是否已经启动了该监视器
private boolean isStaring=false;
public void start(){
//多个线程来访问修改isStrating,会产生线程安全问题 需要使用synchronize来保证原子性
synchronized (this) {
if (isStaring) {
// 已经启动了 直接返回
return;
}
}
isStaring=true;
thread=new Thread(()->{
while (true) {
if(flag){
log.info("停止监控...");
break;
}
try {
Thread.sleep(1000);
log.info("监控记录中...");
} catch (InterruptedException e) {
}
}
},"监视器");
thread.start();
}
public void stop() {
flag=true;
//打断线程 避免在睡眠后又一次记录
thread.interrupt();
}
}
20:09:15.560 [监视器] INFO org.lc.volatile_.LogMonitor - 监控记录中...
20:09:16.566 [监视器] INFO org.lc.volatile_.LogMonitor - 监控记录中...
20:09:17.566 [监视器] INFO org.lc.volatile_.LogMonitor - 监控记录中...
...
③线程安全的单例模式
效率太低,详细请看7下的第④个。
public class SafeSingleton {
private static SafeSingleton safeSingleton=null;
private SafeSingleton() {
}
public static synchronized SafeSingleton getInstance() {
if (safeSingleton != null) {
return safeSingleton;
}
return safeSingleton=new SafeSingleton();
}
}
7、volatile原理
①读屏障和写屏障的可见性和有序性
- 可见性
- 写屏障(sfence)保证在该屏障之前的,对共享变量的改动,都同步到主存当中
- 而读屏障(lfence)保证在该屏障之后,对共享变量的读取,加载的是主存中新数据
- 有序性
- 写屏障会确保指令重排序时,不会将写屏障之前的代码排在写屏障之后
- 读屏障会确保指令重排序时,不会将读屏障之后的代码排在读屏障之前
volatile 的底层实现原理**是内存屏障,**Memory Barrier(Memory Fence)
- 对 volatile 变量的写指令后会加入写屏障
- 对 volatile 变量的写指令后会加入写屏障
②如何保证可见性
写屏障(sfence)保证在该屏障之前的,对共享变量的改动,都同步到主存当中
private volatile boolena ready=false; public void actor2(I_Result r) { num = 2; //写到主存中 ready = true; // ready 是 volatile 赋值带写屏障 // 写屏障 }
而读屏障(lfence)保证在该屏障之后,对共享变量的读取,加载的是主存中新数据
public void actor1(I_Result r) { // 读屏障 // ready 是 volatile 读取值带读屏障 if(ready) { //num也从主存中读 r.r1 = num + num; } else { r.r1 = 1; } }
<img src="http://img.louchen.top/2020/05/20200528102536.png" style="zoom: 67%;" />
#### ③如何保证有序性
- 写屏障会确保指令重排序时,**不会将写屏障之前的代码排在写屏障之后**
- ```java
public void actor2(I_Result r) {
num = 2;
ready = true; // ready 是 volatile 赋值带写屏障
// 写屏障
}
读屏障会确保指令重排序时,不会将读屏障之后的代码排在读屏障之前
public void actor1(I_Result r) { // 读屏障 // ready 是 volatile 读取值带读屏障 if(ready) { r.r1 = num + num; } else { r.r1 = 1; } }
总结: 被volatile修饰的成员变量,只能在本线程保证有序性和可见性,不能解决指令交错。
- 写屏障:保证被volatile的修饰的成员变量在改变之前的代码都是写到主存中。保证在修改该变量之前的代码不会排在该变量之后。
- 读屏障:保证被volatile的修饰的成员变量在读取之后的代码都是从主存中读。保证在读该变量之后的代码不会排在该变量之前。
④单例模式的 double-checked locking (双重检查)
之前创建的单例的方式: 这种方式效率非常低。
public class SafeSingleton {
private static SafeSingleton safeSingleton=null;
private SafeSingleton() {
}
public static SafeSingleton getInstance() {
synchronized (SafeSingleton.class) {
if (safeSingleton == null) {
return safeSingleton==new SafeSingleton();;
}
}
return safeSingleton
}
}
改进:
public class SafeSingleton {
private static SafeSingleton safeSingleton=null;
private SafeSingleton() {
}
public static SafeSingleton getInstance() {
//多个线程访问 如果实例不为空则直接返回,后面的线程无需再竞争锁
if(safeSingleton==null) {
// 第一个进来的线程 获得到锁创建
synchronized (SafeSingleton.class) {
if (safeSingleton == null) {
return safeSingleton=new SafeSingleton();
}
}
}
return safeSingleton;
}
}
以上的实现特点是:
- 懒惰实例化
- 首次使用 getInstance() 才使用 synchronized 加锁,后续使用时无需加锁
- 有隐含的,但很关键的一点:第一个 if 使用了 INSTANCE 变量,是在同步块之外
但在多线程环境下,上面的代码是有问题的,getInstance 方法对应的字节码为:
其中
- 17 表示创建对象,将对象引用入栈 // new Singleton
- 20 表示复制一份对象引用 // 引用地址
- 21 表示利用一个对象引用,调用构造方法
- 24 表示利用一个对象引用,赋值给 static INSTANCE
也许 jvm 会优化为:先执行 24,再执行 21。如果两个线程 t1,t2 按如下时间序列执行:
通俗一点就是,线程t2
执行第一个if(safeSingleton==null)
时,线程t1
已经执行到第二个if(safeSingleton==null)
中的safeSingleton=new SafeSingleton();
,但是由于jvm指令重排的关系,可能会先执行对safeSingleton
的引用赋值情况,但是new SafeSingleton()
还未执行完毕。所以线程t2
拿到返回的实例为未被初始化完毕的实例。
**注意:**对 INSTANCE 使用 volatile 修饰即可,可以禁用指令重排,但要注意在 JDK 5 以上的版本的 volatile 才会真正有效
最终解决方案:
public class SafeSingleton {
//给实例加上volatile关键字 加入内存屏障
private volatile static SafeSingleton safeSingleton=null;
private SafeSingleton() {
}
public static SafeSingleton getInstance() {
// 可见性:读:该屏障后面的代码都会从主存中读
// 有序性:读:该屏障后面的代码都会在该屏障之后执行
if(safeSingleton==null) {
synchronized (SafeSingleton.class) {
if (safeSingleton == null) {
//可见性:写:该屏障之前的代码都会写到主存中
//有序性,写:该屏障之前的代码后会在该屏障之前执行
//保证对变量safeSingleton的赋值为有序性,即先调用构造方法,再赋值引用给safeSingleton
return safeSingleton=new SafeSingleton();
}
}
}
return safeSingleton;
}
}
8、happens-before规则
happens-before 规定了对共享变量的写操作对其它线程的读操作可见,它是可见性与有序性的一套规则总结,抛 开以下 happens-before 规则,JMM 并不能保证一个线程对共享变量的写,对于其它线程对该共享变量的读可见
- 线程解锁 m 之前对变量的写,对于接下来对 m 加锁的其它线程对该变量的读可见
synchronized能够保证原子性,可见性,有序性
static int x;
static Object m = new Object();
new Thread(()->{
synchronized(m) {
x = 10;
} },"t1").start();
new Thread(()->{
synchronized(m) {
System.out.println(x);
} },"t2").start();
- 线程对 volatile 变量的写,对接下来其它线程对该变量的读可见
volatile static int x;
new Thread(()->{
x = 10;
},"t1").start();
new Thread(()->{
System.out.println(x);
},"t2").start();
- 线程 start 前对变量的写,对该线程开始后对该变量的读可见
static int x;
x = 10;
new Thread(()->{
System.out.println(x);
},"t2").start();
- 线程结束前对变量的写,对其它线程得知它结束后的读可见(比如其它线程调用 t1.isAlive() 或 t1.join()等待 它结束)
static int x;
Thread t1 = new Thread(()->{
x = 10;
},"t1"); t1.start();
t1.join();
//读线程已经执行完修改的变量
System.out.println(x);
- 线程 t1 打断 t2(interrupt)前对变量的写,对于其他线程得知 t2 被打断后对变量的读可见(通过 t2.interrupted 或 t2.isInterrupted)
static int x;
public static void main(String[] args) {
Thread t2 = new Thread(()->{
while(true) {
if(Thread.currentThread().isInterrupted()) {
//
System.out.println(x);
break;
}
}
},"t2");
t2.start();
new Thread(()->{
sleep(1);
x = 10;
//内部加了synchronized关键字
t2.interrupt();
},"t1").start();
while(!t2.isInterrupted()) {
Thread.yield();
}
System.out.println(x);
}
- 对变量默认值(0,false,null)的写,对其它线程对该变量的读可见
- 具有传递性,配合 volatile 的防指令重排,有下面的例子
volatile static int x; static int y;
new Thread(()->{
y = 10;
x = 20;
},"t1").start();
new Thread(()->{
// x=20 对 t2 可见, 同时 y=10 也对 t2 可见
System.out.println(x);
},"t2").start();
注意:变量都是指成员变量或静态成员变量
9、习题
①balking 模式习题
希望 doInit() 方法仅被调用一次,下面的实现是否有问题,为什么?
class TestVolatile{
volatile Boolean initialized=false;
void init() {
if (initialized) {
return;
}
doInit();
initialized=true;
}
private void doInit() {
//初始化的一些操作
}
}
上述代码存在多线程并发操作,doInit()调用多次的情况, 修改如下:
class TestVolatile{
volatile boolean initialized=false;
void init() {
if (initialized) {
return;
}
synchronized (TestVolatile.class) {
if(initialized){
return;
}
doInit();
initialized=true;
}
}
private void doInit() {
//初始化的一些操作
}
}
②线程安全单例习题
单例模式有很多实现方法,饿汉、懒汉、静态内部类、枚举类,试分析每种实现下获取单例对象(即调用 getInstance)时的线程安全,并思考注释中的问题
- 饿汉式:类加载就会导致该单实例对象被创建
- 懒汉式:类加载不会导致该单实例对象被创建,而是首次使用该对象时才会创建
实现1(饿汉式):
//1、为什么加final? 答:防止子类对父类覆盖创建对象方法的干扰
//2、如果实现了序列化接口,还要做什么来防止序列化破坏单例? 答:加上一个readResovle方法
public final class Singleton implements Serializable {
//3、为什么设置为私有? 答:防止通过new无限制的创建对象。 是否能防止反序列化创建新的实例? 答:不能
private Singleton() {
}
//4、这样初始化能否保证线程安全问题? 答:在类加载的时候,该对象即被jvm创建,所有为线程安全的
private static final Singleton INSTANCE = new Singleton();
//5、为什么使用方法提供实例,而不是直接通过设置字段成员公有提供实例? 答:更好的封装性,扩展性
public static Singleton getInstance() {
return INSTANCE;
}
//若该类有readResolve方法,则使用该方法返回的对象,而不是使用序列化后的对象实例
public Object readResolve() {
return INSTANCE;
}
}
实现2(懒汉式):
public class Singleton {
private Singleton() {
}
private static Singleton INSTANCE=null;
//对Singleton类对象上锁,性能比较低。
public static synchronized Singleton getInstance() {
if (INSTANCE != null) {
return INSTANCE;
}
INSTANCE=new Singleton();
return INSTANCE;
}
}
实现3(double-checked locking 双重检查)懒汉
public class Singleton {
private Singleton() {
}
//加上volatile,防止在创建对象的时候指令重排
private static volatile Singleton INSTANCE=null;
public static Singleton getInstance() {
if (INSTANCE != null) {
return INSTANCE;
}
synchronized (Singleton.class) {
if(INSTANCE!=null){
return INSTANCE;
}
//创建对象
INSTANCE = new Singleton();
return INSTANCE;
}
}
}
实现4(静态内部类)懒汉:
public class Singleton {
private Singleton() {
}
//使用静态内部类,对外部不可见。
//jvm本身对类加载是懒惰的,即只调用到getInstance方法时,使用到LazyHolder对象时该内部类才会被加载,里面的实例才会被加载
private static class LazyHolder{
static final Singleton INSTANCE=new Singleton();
}
//调用时才会加载此内部类
public static Singleton getInstance() {
return LazyHolder.INSTANCE;
}
}
实现5(枚举)
//反编译后的问题:继承Enum枚举类
//final enum Singleton extends Enum{
// 类加载时创建对象
// public final static enum Singleton INSTANCE;
// }
//1、枚举单例是如何实现限制实例个数的? 每个枚举实例,都是一个单例对象
//2、枚举单例在创建时是否有有并发问题? 没有,final static实例会在类初始化的时候创建
//3、枚举单例能否被反序列化破坏单例? 枚举类默认实现序列化接口,可被序列化和反序列,但是不破坏单例
//4、枚举能否通过反射破坏单例? 不能。
//5、枚举单例属于饿汉式
//6、枚举单例中可以加入一些字段和构造方法,来初始化单例的属性
public enum Singleton {
INSTANCE;
public static Singleton GetInstance(){
return INSTANCE;
}
}
class SingleTest{
public static void main(String[] args) {
Singleton instance = Singleton.GetInstance();
Singleton instance1 = Singleton.INSTANCE;
// true
System.out.println(instance==instance1);
}
}
Ⅴ、共享模型之无锁
1、存取款问题
①有锁方式synchronized
账户接口
interface Account {
//获取余额
Integer getBalance();
//取款
void withdraw(Integer amount);
static void demo(Account account) {
List<Thread> list = new ArrayList<>();
long start = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
list.add(new Thread(() -> {
account.withdraw(10);
}));
}
list.forEach(Thread::start);
list.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.currentTimeMillis();
System.out.println(account.getBalance() + " cost:" + (end - start)+"ms");
}
}
实现接口
public class T1 {
public static void main(String[] args) {
AcccountSafe safe = new AcccountSafe(10000);
Account.demo(safe);
}
}
class AcccountSafe implements Account {
// 余额
private Integer balance;
public AcccountSafe(Integer balance) {
this.balance = balance;
}
@Override
public synchronized Integer getBalance() {
return this.balance;
}
@Override
public synchronized void withdraw(Integer amount) {
this.balance -= amount;
}
}
0 cost:126ms
②无锁方式CAS
账户接口 不变
实现接口
public class T1 {
public static void main(String[] args) {
AccountCAS cas=new AccountCAS(10000);
Account.demo(cas);
}
}
class AccountCAS implements Account{
//余额
//使用原子的Atomic
private AtomicInteger balance;
public AccountCAS(Integer balance) {
this.balance = new AtomicInteger(balance);
}
@Override
public Integer getBalance() {
//获取最新的值
return balance.get();
}
@Override
public void withdraw(Integer amount) {
//使用cas操作
while (true) {
//获取最新的余额
int pre=balance.get();
//取钱之后的余额
int next=pre-amount;
//期望最新值 操作完之后的值
if(balance.compareAndSet(pre, next)){
break;
}
}
}
}
0 cost:68ms
结果: 无锁的操作相比加锁确实效率提升了不少
0 cost:126ms //加锁
0 cost:68ms //无锁
2、CAS与Volatile
①CAS操作分析
前面看到的 AtomicInteger 的解决方法,内部并没有用锁来保护共享变量的线程安全。那么它是如何实现的呢?
@Override
public void withdraw(Integer amount) {
//不断循环重试,知道成功为止
while (true) {
//获取最新的余额 比如,拿到最新的值10000
int pre=balance.get();
//取钱之后的余额 操作 10000-10=9990
int next=pre-amount;
//当前线程读取到的最新余额 ; 操作完之后的余额
//compareAndSet这个操作是原子性的
//例如,当该线程t1读取到当前的余额和操作之后的余额得到compareAndSet(10000,9990)
//此外,线程t2再此原子操作之前修改了最新的余额为9990,即当前线程t1执行compareAndSet之前 //pre会和最新的期望余额值进行比较,如果不一致,则next作废,返回false,重试。否则没有其他线程对余额进行操作,返回true,退出循环
if(balance.compareAndSet(pre, next)){
break;
}
}
}
其中的关键是 compareAndSet,它的简称就是 CAS (也有 Compare And Swap 比较交换 的说法),它必须是原子操作。
我们可以发现在 AtomicInteger 中 是使用volatile保证可见性
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
//**
//**
}
注意:
其实 CAS 的底层是 lock cmpxchg 指令(X86 架构),在单核 CPU 和多核 CPU 下都能够保证【比较-交 换】的原子性。
在多核状态下,某个核执行到带 lock 的指令时,CPU 会让总线锁住,当这个核把此指令执行完毕,再 开启总线。这个过程中不会被线程的调度机制所打断,保证了多个线程对内存操作的准确性,是原子 的。
CAS 必须借助 volatile 才能读取到共享变量的新值来实现【比较并交换】的效果
②为什么无锁效率高?
- 无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而 synchronized 会让线程在没有获得锁的时 候,发生上下文切换,进入阻塞。打个比喻
- 线程就好像高速跑道上的赛车,高速运行时,速度超快,一旦发生上下文切换,就好比赛车要减速、熄火, 等被唤醒又得重新打火、启动、加速... 恢复到高速运行,代价比较大
- 但无锁情况下,因为线程要保持运行,需要额外 CPU 的支持,CPU 在这里就好比高速跑道,没有额外的跑 道,线程想高速运行也无从谈起,虽然不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还 是会导致上下文切换。
③CAS与synchronized比较
CAS
CAS 是基于乐观锁的思想:乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再 重试呗。 每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止。
CAS 体现的是无锁并发、无阻塞并发,请仔细体会这两句话的意思
- 因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
- 但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响
synchronized
synchronized 是基于悲观锁的思想:悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想 改,我改完了解开锁,你们才有机会。 这种线程一旦得到锁,其他需要锁的线程就挂起的情况就是悲观锁。每次的锁的占用都会阻塞其他线程。导致线程上下文的切换,进行加解锁的时候效率会降低。
④CAS使用场景
获取共享变量时,为了保证该变量的可见性,需要使用 volatile 修饰。结合 CAS 和 volatile 可以实现无锁并发,适用于竞争不激烈、多核 CPU 的场景下。
⑤悲观锁乐观锁比较
定义: 悲观锁(Pessimistic Lock): 每次获取数据的时候,都会担心数据被修改,所以每次获取数据的时候都会进行加锁,确保在自己使用的过程中数据不会被别人修改,使用完成后进行数据解锁。由于数据进行加锁,期间对该数据进行读写的其他线程都会进行等待。
乐观锁(Optimistic Lock): 每次获取数据的时候,都不会担心数据被修改,所以每次获取数据的时候都不会进行加锁,但是在更新数据的时候需要判断该数据是否被别人修改过。如果数据被其他线程修改,则不进行数据更新,如果数据没有被其他线程修改,则进行数据更新。由于数据没有进行加锁,期间该数据可以被其他线程进行读写操作。
适用场景: 悲观锁:比较适合写入操作比较频繁的场景,如果出现大量的读取操作,每次读取的时候都会进行加锁,这样会增加大量的锁的开销,降低了系统的吞吐量。
乐观锁:比较适合读取操作比较频繁的场景,如果出现大量的写入操作,数据发生冲突的可能性就会增大,为了保证数据的一致性,应用层需要不断的重新获取数据,这样会增加大量的查询操作,降低了系统的吞吐量。
总结:两种所各有优缺点,读取频繁使用乐观锁,写入频繁使用悲观锁
3、原子整数
java.util.concurrent.atomic下的
AtomicInteger
AtomicBoolean
AtomicLong
以AtomicInteger为例:
public class T2 {
public static void main(String[] args) {
// 初始化值为0
AtomicInteger i = new AtomicInteger(1);
// 先获取再自增1 返回:1
System.out.println(i.getAndIncrement()); //i++
// 先自增1再获取 返回:3
System.out.println(i.incrementAndGet()); //++i
// 先自减1再获取 返回:2
System.out.println(i.decrementAndGet());// --i;
// 先获取再自减1 返回:2
System.out.println(i.getAndDecrement());// i--;
// 直接获取值 返回:1
System.out.println(i.get());
// 加上指定的值大小(先获取,再加) 返回:1 结果为11
System.out.println(i.getAndAdd(10));
// 加上指定的值大小(先增加,再获取)
// i.addAndGet(10)
// 对指定的值进行更新操作 IntUnaryOperator函数式接口提供一个int类型参数,int类型的返回值的方法
// 先更新后获取 返回:22 结果:22
System.out.println(i.updateAndGet(value -> value * 2));
// 对指定的值进行更新操作
// 先获取后更新 返回22 结果2200
System.out.println(i.getAndUpdate(value -> value * 100));
// 先计算再获取 IntBinaryOperator函数式接口提供两个int类型参数 第一个为值i为2200,第二个为传来的参数200,返回一个int类型
// 返回2000 结果为2000
System.out.println(i.accumulateAndGet(200, (p, x) -> p - x));
}
}
即我们可以将上述取钱的操作修改:
@Override
public void withdraw(Integer amount) {
// //使用cas操作
// while (true) {
// //获取最新的余额
// int pre=balance.get();
// //取钱之后的余额
// int next=pre-amount;
// //期望最新值 操作完之后的值
// if(balance.compareAndSet(pre, next)){
// break;
// }
// }
//取钱加一个负数即可
balance.addAndGet(-1*amount);
}
手动实现UpdateAndGet方法:
public class T3 {
public static void main(String[] args) {
AtomicInteger i = new AtomicInteger(10);
System.out.println(updateAndGet(i, value -> value * 10));
}
static int updateAndGet(AtomicInteger i, IntUnaryOperator operator){
while (true) {
// 获取最新的值
int pre = i.get();
int next = operator.applyAsInt(pre);
// 调用cas操作
if(i.compareAndSet(pre, next)){
return next;
}
}
}
}
AtomicInteger的UpdateAndGet方法源码:
public final int updateAndGet(IntUnaryOperator updateFunction) {
int prev, next;
do {
prev = get();
next = updateFunction.applyAsInt(prev);
} while (!compareAndSet(prev, next));
return next;
}
4、原子引用
AtomicReference
AtomicMarkableReference
AtomicStampedReference
①AtomicReference<V>的使用
对引用类型进行原子操作
取款问题,若存取的操作为引用类型。则需要AtomicReference保护该类型
账户类
interface AccountR {
//获取余额
BigDecimal getBalance();
//取款
void withdraw(BigDecimal amount);
static void demo(AccountR account) {
List<Thread> list = new ArrayList<>();
long start = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
list.add(new Thread(() -> {
account.withdraw(new BigDecimal(String.valueOf(10)));
}));
}
list.forEach(Thread::start);
list.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.currentTimeMillis();
System.out.println(account.getBalance() + " cost:" + (end - start)+"ms");
}
}
实现类
public class T4 {
public static void main(String[] args) {
DecimalAccountCas cas=new DecimalAccountCas(new BigDecimal(String.valueOf(10000)));
AccountR.demo(cas);
}
}
class DecimalAccountCas implements AccountR{
// 原子引用 <要保护的类型>
private AtomicReference<BigDecimal> reference;
public DecimalAccountCas(BigDecimal balance) {
this.reference = new AtomicReference<>(balance);
}
@Override
public BigDecimal getBalance() {
return reference.get();
}
@Override
public void withdraw(BigDecimal amount) {
while (true) {
BigDecimal pre=reference.get();
//subtract减操作
BigDecimal next=pre.subtract(amount);
// 使用cas
if (reference.compareAndSet(pre, next)) {
break;
}
}
}
}
0 cost:159ms
②ABA问题及其解决
ABA问题
@Slf4j
public class T5 {
static AtomicReference<String> ref=new AtomicReference<>("A");
public static void main(String[] args) throws InterruptedException {
log.info("main start...");
// 获取值
String value = ref.get();
//其他线程对变量修改
other();
Thread.sleep(1000);
// 尝试修改为
// 能否判断该变量value被其他线程修改过?
log.info("change A->C {}",ref.compareAndSet(value, "C"));
}
static void other() throws InterruptedException {
new Thread(()->{
log.info("change A->B {}",ref.compareAndSet(ref.get(),"B"));
},"t1").start();
Thread.sleep(500);
new Thread(()->{
log.info("change B->A {}",ref.compareAndSet(ref.get(),"A"));
},"t2").start();
}
}
19:04:04.348 [main] INFO org.lc.cas.T5 - main start...
19:04:04.386 [t1] INFO org.lc.cas.T5 - change A->B true
19:04:04.886 [t2] INFO org.lc.cas.T5 - change B->A true
19:04:05.886 [main] INFO org.lc.cas.T5 - change A->C true
我们发现,在主线程main执行cas操作的时候,并不能感知其他线程对其值value修改过的操作。只是表面上的一致。中间操作还是被其他线程修改过。
主线程仅能判断出共享变量的值与初值 A 是否相同,不能感知到这种从 A 改为 B 又 改回 A 的情况,如果主线程 希望:
只要有其它线程【动过了】共享变量,那么自己的 cas 就算失败,这时,仅比较值是不够的,需要再加一个版本号
③AtomicStampedReference<V>解决ABA问题(加版本号)
保证期望值匹配和版本号匹配CAS才能成功
@Slf4j
public class T6 {
//赋初值的时候加版本号
static AtomicStampedReference<String> ref=new AtomicStampedReference<>("A",0);
public static void main(String[] args) throws InterruptedException {
log.info("main start...");
// 获取值
String value = ref.getReference();
// 获取版本号
int stamp = ref.getStamp();
log.info("值为:{},版本号为:{}",value,stamp);
//其他线程对变量修改
other();
Thread.sleep(1000);
log.info("值为:{},版本号为:{}",value,stamp);
// 尝试修改为
// 期望的值;新值;期望的版本号;新的版本号
// stamp为0,但是other中的方法对已经对stamp进行了修改为2,版本号不匹配,即此CAS操作失败
log.info("change A->C {}",ref.compareAndSet(value,"C",stamp, stamp+1));
// 输出为A. 是other中的线程t2的cas操作的结果
System.out.println(ref.getReference());
}
static void other() throws InterruptedException {
new Thread(()->{
String value=ref.getReference();
int stamp=ref.getStamp();
log.info("值为:{},版本号为:{}",value,stamp);
log.info("change A->B {}",ref.compareAndSet(value,"B",ref.getStamp(),stamp+1));
},"t1").start();
Thread.sleep(500);
new Thread(()->{
String value=ref.getReference();
int stamp=ref.getStamp();
log.info("值为:{},版本号为:{}",value,stamp);
log.info("change A->B {}",ref.compareAndSet(value,"A",ref.getStamp(),stamp+1));
},"t2").start();
}
}
19:24:01.934 [main] INFO org.lc.cas.T6 - main start...
19:24:01.936 [main] INFO org.lc.cas.T6 - 值为:A,版本号为:0
19:24:01.974 [t1] INFO org.lc.cas.T6 - 值为:A,版本号为:0
19:24:01.974 [t1] INFO org.lc.cas.T6 - change A->B true
19:24:02.474 [t2] INFO org.lc.cas.T6 - 值为:B,版本号为:1
19:24:02.474 [t2] INFO org.lc.cas.T6 - change A->B true
19:24:03.475 [main] INFO org.lc.cas.T6 - 值为:A,版本号为:0
19:24:03.475 [main] INFO org.lc.cas.T6 - change A->C false
A
AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,如: A -> B -> A -> C ,通过AtomicStampedReference,我们可以知道,引用变量中途被更改了几次。但是有时候,并不关心引用变量更改了几次,只是单纯的关心是否更改过,所以就有了 AtomicMarkableReference
④AtomicMarkableReference<V>解决ABA问题(true/false标记)
保证期望值匹配和标志匹配CAS才能成功
垃圾袋类
//垃圾袋
class GarbageBag{
String desc;
public GarbageBag(String desc) {
this.desc = desc;
}
public GarbageBag setDesc(String desc) {
this.desc = desc;
return this;
}
@Override
public String toString() {
return "GarbageBag{" +
"desc='" + desc + '\'' +
'}';
}
}
实现操作
@Slf4j
public class T7 {
public static void main(String[] args) throws InterruptedException {
GarbageBag bag=new GarbageBag("这是一个装满了垃圾的垃圾袋");
// 初始化并标记 标记为true,这里只是作为一个标记,其他线程修改该值时,将此标记改为false
AtomicMarkableReference<GarbageBag> reference=new AtomicMarkableReference<>(bag, true);
log.info("主线程 start...");
GarbageBag prev = reference.getReference();
log.info(prev.toString());
Thread.sleep(1000);
log.info("想换一只垃圾袋?");
//CAS操作
//参数: 期望的值;修改后的值;期望的标记;修改后的标记
//先匹配prev的值是否被修改,再匹配第三个参数期望的标记是否为初始化时的标记为true.
//都匹配,则cas成功。否则失败
boolean result = reference.compareAndSet(prev, new GarbageBag("新的垃圾袋"), true, false);
log.info("换成功没? {}",result);
}
}
19:51:28.716 [main] INFO org.lc.cas.T7 - 主线程 start...
19:51:28.718 [main] INFO org.lc.cas.T7 - GarbageBag{desc='这是一个装满了垃圾的垃圾袋'}
19:51:29.719 [main] INFO org.lc.cas.T7 - 想换一只垃圾袋?
19:51:29.719 [main] INFO org.lc.cas.T7 - 换成功没? true
当其他线程对其prev修改时,主线程能否CAS成功?
@Slf4j
public class T7 {
public static void main(String[] args) throws InterruptedException {
GarbageBag bag=new GarbageBag("这是一个装满了垃圾的垃圾袋");
// 初始化并标记
AtomicMarkableReference<GarbageBag> reference=new AtomicMarkableReference<>(bag, true);
log.info("主线程 start...");
GarbageBag prev = reference.getReference();
log.info(prev.toString());
//此线程就是修改了prev的desc的变量值
new Thread(()->{
GarbageBag bag1 = reference.getReference();
log.info("打扫卫生的线程 start...");
bag1.setDesc("垃圾已打扫,这是一个空垃圾袋");
//cas成功
while (!reference.compareAndSet(bag1,bag1,true,false)){ }
log.info(bag.toString());
},"t1").start();
Thread.sleep(1000);
log.info("想换一只垃圾袋?");
//cas失败 因为pre已经被修改,且新的期望值已经变为false.
boolean result = reference.compareAndSet(prev, new GarbageBag("新的垃圾袋"), true, false);
log.info("换成功没? {}",result);
}
}
19:59:31.901 [main] INFO org.lc.cas.T7 - 主线程 start...
19:59:31.903 [main] INFO org.lc.cas.T7 - GarbageBag{desc='这是一个装满了垃圾的垃圾袋'}
19:59:31.947 [t1] INFO org.lc.cas.T7 - 打扫卫生的线程 start...
19:59:31.947 [t1] INFO org.lc.cas.T7 - GarbageBag{desc='垃圾已打扫,这是一个空垃圾袋'}
19:59:32.948 [main] INFO org.lc.cas.T7 - 想换一只垃圾袋?
19:59:32.948 [main] INFO org.lc.cas.T7 - 换成功没? false
5、原子数组
AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray
①AtomicIntegerArray的使用
对数组的每个索引处进行10000的累加操作。期望结果:[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]
/**
*数组测试
* @param arraySupplier 提供数组,可以是线程不安全数组或线程安全数组
* @param lengthFun 获取数组长度的方法
* @param putConsume 自增方法,回传array,index
* @param printConsume 打印数组的方法
* @param <T>类型定义
*/
private static <T> void demo(
// T get();
Supplier<T> arraySupplier,
// R apply(T t);
Function<T,Integer> lengthFun,
// void accept(T t, U u);
BiConsumer<T,Integer> putConsume,
// void accept(T t);
Consumer<T> printConsume){
List<Thread> ts=new ArrayList<>();
// 获取数组
T array = arraySupplier.get();
// 获取数组长度
Integer length = lengthFun.apply(array);
// 循环10次
for (int i = 0; i < length; i++) {
//每个线程对数组10000次操作
ts.add(new Thread(()->{
//对每个索引的位置进行1000的累加操作
for (int j = 0; j < 10000; j++) {
//数组; 得到 0,1,2,3,4,5,6,7,8,9索引
putConsume.accept(array, j % length);
}
}));
}
ts.forEach(Thread::start);
ts.forEach(thread -> {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
//打印数组
printConsume.accept(array);
}
}
非安全操作:
public static void main(String[] args) {
demo(
//传递长度为10的int数组
()->new int[10],
//传入数组长度
(array->array.length),
//对数组中的每个索引位10000次加1操作
//array[index]++,这里可能存在指令的交错。导致线程不安全
(array,index)->array[index]++,
//打印操作完后的数组
array-> System.out.println(Arrays.toString(array))
);
}
[9312, 9230, 9238, 9229, 9222, 9210, 9291, 9263, 9247, 9230]
安全操作:
public static void main(String[] args) {
demo(
//传递长度为10的int数组
()->new AtomicIntegerArray(10),
//传入数组长度
(array->array.length()),
//对数组中的每个索引位10000次加1操作
(array,index)->array.getAndIncrement(index),
//打印操作完后的数组
array-> System.out.println(array)
);
}
[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]
6、字段更新器
我们只能对对象进行原子保护,但是不能对对象中的字段进行保护。因为对象中属性的改变并不会导致期望值的改变。所以我们需要字段更新器
根据要修改的字段类型,决定使用什么类型:
AtomicReferenceFieldUpdater
AtomicIntegerFieldUpdater
AtomicLongFieldUpdater
①AtomicReferenceFieldUpdater的使用
注意:要操作的字段必须加volatile
public class T10 {
public static void main(String[] args) throws InterruptedException {
AtomicReferenceFieldUpdater fieldUpdater=AtomicReferenceFieldUpdater.newUpdater(Student.class,String.class, "name");
Student student=new Student();
new Thread(()->{
//下面的cas成功
student.setName("李四");
//下面的cas失败
//student.setName("张三");
},"t1").start();
Thread.sleep(500);
// cas操作:将对象字段的预期值和操作的对象中的值进行比较。如果一致则cas成功,否则失败
//当前操作的对象; 对象字段的预期值(原始值); 更新自动后的值
System.out.println(fieldUpdater.compareAndSet(student, "李四", "张三"));
System.out.println(student.toString());
}
}
class Student{
//这里必须要配合volatile操作
volatile String name;
public String getName() {
return name;
}
public Student setName(String name) {
this.name = name;
return this;
}
@Override
public String toString() {
return "Student{" +
"name='" + name + '\'' +
'}';
}
}
7、原子累加器
LongAdder
①比较 AtomicLong 与 LongAdder
public class T11 {
public static void main(String[] args) {
//20000000 cost:327
demo(() -> new AtomicLong(), adder -> adder.getAndIncrement());
//20000000 cost:49
demo(() -> new LongAdder(), adder -> adder.increment());
}
private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {
T adder = adderSupplier.get();
long start = System.nanoTime();
List<Thread> ts = new ArrayList<>();
// 40个线程 每个线程累加50万
for (int i = 0; i < 40; i++) {
ts.add(new Thread(() -> {
for (int j = 0; j < 500000; j++) {
action.accept(adder);
}
}));
}
ts.forEach(t -> t.start());
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(adder + " cost:" + (end - start) / 1000_000);
}
}
性能提升的原因很简单,就是在有竞争时,设置多个累加单元,Therad-0 累加 Cell[0],而 Thread-1 累加 Cell[1]... 后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。
Ⅵ、共享模型之不可变
1、日期转换出现的线程安全问题
public class T1 {
public static void main(String[] args) {
SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
for (int i = 0; i < 10; i++) {
new Thread(()->{
try {
System.out.println(sdf.parse("2019-1-1 11:11:11"));
} catch (ParseException e) {
e.printStackTrace();
}
}).start();
}
}
}
Exception in thread "Thread-5" java.lang.NumberFormatException: multiple points
at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1890)
at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110)
at java.lang.Double.parseDouble(Double.java:538)
at java.text.DigitList.getDouble(DigitList.java:169)
......
Tue Jan 01 11:11:11 GMT+08:00 2019
Tue Jan 01 11:11:11 GMT+08:00 2019
多线程同时操作同一个日期转换对象可能出现异常
①加锁synchronized
效率比较低
②使用不可变类
public class T1 {
public static void main(String[] args) {
DateTimeFormatter sdf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
for (int i = 0; i < 10; i++) {
new Thread(()->{
try {
System.out.println(sdf.parse("2019-01-01 11:11:11",(LocalDateTime::from)));
//等价->
// System.out.println(sdf.parse("2019-01-01 11:11:11", (TemporalAccessor temporal) -> LocalDateTime.from(temporal)));
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
* @implSpec
* This class is immutable and thread-safe. //类不可变且是线程安全的
public final class DateTimeFormatter {
//...
}
2、不可变类的设计
另一个大家更为熟悉的 String 类也是不可变的,以它为例,说明一下不可变设计的要素
public final class String
implements java.io.Serializable, Comparable<String>, CharSequence {
//....
}
①final的使用
发现该类、类中所有属性都是 final 的
- 属性用 final 修饰保证了该属性是只读的,不能修改
- 类用 final 修饰保证了该类中的方法不能被覆盖,防止子类无意间破坏不可变性
②保护性拷贝
例如String中的substring 方法
public String substring(int beginIndex) {
if (beginIndex < 0) {
throw new StringIndexOutOfBoundsException(beginIndex);
}
int subLen = value.length - beginIndex;
if (subLen < 0) {
throw new StringIndexOutOfBoundsException(subLen);
}
//实际new了一个新的对象
return (beginIndex == 0) ? this : new String(value, beginIndex, subLen);
}
通过创建副本对象来避 免共享的手段称之为【保护性拷贝(defensive copy)】
3、享元模式
定义 英文名称:Flyweight pattern. 当需要重用数量有限的同一类对象时
①包装类体现
在JDK中 Boolean,Byte,Short,Integer,Long,Character 等包装类提供了 valueOf 方法,例如 Long 的 valueOf 会缓存 -128~127 之间的 Long 对象,在这个范围之间会重用对象,大于这个范围,才会新建 Long 对 象:
Long对象
public static Long valueOf(long l) {
final int offset = 128;
if (l >= -128 && l <= 127) { // will cache
return LongCache.cache[(int)l + offset];
}
return new Long(l);
}
注意:
- Byte, Short, Long 缓存的范围都是 -128~127
- Character 缓存的范围是 0~127
- Integer的默认范围是 -128~127
- 最小值不能变
- 但最大值可以通过调整虚拟机参数
-Djava.lang.Integer.IntegerCache.high
来改变- Boolean 缓存了 TRUE 和 FALSE
②BigInteger和BigDecimal体现
我们可以发现在BigDecimal进行add操作时,也是通过new一个BigDecimal进行的累加操作,不存在线程安全问题。但是多个方法同时操作,多线程的情况下则存在线程安全问题。
③实现简单的享元模式(数据库连接池)
例如:一个线上商城应用,QPS(每秒查询率(QPS,Queries-per-second)是对一个特定的查询服务器在规定时间内所处理流量多少的衡量标准。) 达到数千,如果每次都重新创建和关闭数据库连接,性能会受到极大影响。 这时 预先创建好一批连接,放入连接池。一次请求到达后,从连接池获取连接,使用完毕后再还回连接池,这样既节约 了连接的创建和关闭时间,也实现了连接的重用,不至于让庞大的连接数压垮数据库
连接对象
//模拟连接对象实现
class MockConnection implements Connection{
private String connectionName;
@Override
public String toString() {
return "MockConnection{" +
"connectionName='" + connectionName + '\'' +
'}';
}
//其他代码 略
}
连接池
@Slf4j
class Pool{
// 连接池大小
private final int poolSize;
// 连接对象数组
private Connection[] connections;
// 连接状态数组 0空闲 1表示繁忙
private AtomicIntegerArray states;
// 构造方法初始化
public Pool(int poolSize) {
this.poolSize = poolSize;
this.connections = new Connection[poolSize];
this.states=new AtomicIntegerArray(new int[poolSize]);
for (int i = 0; i < poolSize; i++) {
// 初始化所有连接
connections[i]=new MockConnection("连接"+(i+1));
}
}
// 借连接
public Connection borrow() {
while (true) {
for (int i = 0; i < poolSize; i++) {
// 获取空闲连接
// 使用cas操作 将该位置的连接更新为1 表示该位置的连接已被占用
if(states.compareAndSet(i, 0, 1)){
// 获得连接
log.info("借到连接 {}",connections[i]);
return connections[i];
}
}
// 如果遍历完所有的连接池中的连接 还没有获得连接
synchronized (this){
try {
// 等待连接
log.info("没有空闲连接,请等待...");
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 归还连接
* @param connection 归还的连接对象
*/
public void free(Connection connection){
for (int i = 0; i < poolSize; i++) {
// 找到归还的连接对象
if(connections[i]==connection){
// 这里无需cas ,应为传过来的Conncetion对象已确定
states.set(i, 0);
synchronized (this) {
log.info("已释放该连接{}...",connection);
this.notify();
}
break;
}
}
}
}
测试
public class T3 {
public static void main(String[] args) {
Pool pool = new Pool(2);
for (int i = 0; i < 5; i++) {
new Thread(()->{
Connection con = pool.borrow();
try {
//模拟 随机1秒内的执行时间
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
pool.free(con);
},""+(i+1)).start();
}
}
}
21:00:48.700 [3] INFO org.lc.flyweight_pattern.Pool - 没有空闲连接,请等待...
21:00:48.700 [1] INFO org.lc.flyweight_pattern.Pool - 借到连接 MockConnection{connectionName='连接1'}
21:00:48.703 [4] INFO org.lc.flyweight_pattern.Pool - 没有空闲连接,请等待...
21:00:48.703 [5] INFO org.lc.flyweight_pattern.Pool - 没有空闲连接,请等待...
21:00:48.700 [2] INFO org.lc.flyweight_pattern.Pool - 借到连接 MockConnection{connectionName='连接2'}
21:00:49.183 [1] INFO org.lc.flyweight_pattern.Pool - 已释放该连接MockConnection{connectionName='连接1'}...
21:00:49.183 [3] INFO org.lc.flyweight_pattern.Pool - 借到连接 MockConnection{connectionName='连接1'}
21:00:49.668 [2] INFO org.lc.flyweight_pattern.Pool - 已释放该连接MockConnection{connectionName='连接2'}...
21:00:49.669 [4] INFO org.lc.flyweight_pattern.Pool - 借到连接 MockConnection{connectionName='连接2'}
21:00:49.748 [3] INFO org.lc.flyweight_pattern.Pool - 已释放该连接MockConnection{connectionName='连接1'}...
21:00:49.748 [5] INFO org.lc.flyweight_pattern.Pool - 借到连接 MockConnection{connectionName='连接1'}
21:00:49.752 [4] INFO org.lc.flyweight_pattern.Pool - 已释放该连接MockConnection{connectionName='连接2'}...
21:00:50.464 [5] INFO org.lc.flyweight_pattern.Pool - 已释放该连接MockConnection{connectionName='连接1'}...
4、final原理
①设置final变量的原理
理解了 volatile 原理,再对比 final 的实现就比较简单了
public class TestFinal {
final int a = 20;
}
字节码:
0: aload_0
1: invokespecial #1 // Method java/lang/Object."<init>":()V
4: aload_0
5: bipush 20
7: putfield #2 // Field a:I
<-- 写屏障
10: return
发现 final 变量的赋值也会通过 putfield 指令来完成,同样在这条指令之后也会加入写屏障**,保证在其它线程读到 它的值时不会出现为 0 的情况**
线程安全问题:当多线操作int a= 20时, a变量的赋值情况为先初始化0 ,然后再赋值20,再次过程中可能线程读到的为0,所以导致线程安全问题。加final修饰的变量可以解决此问题。
Ⅶ 、共享模型之工具
1、自定义线程池
ThreadPool线程池:包含核心的线程数(总共的线程大小),若任务的个数超过核心线程数的时候,会将多余的任务存入阻塞队列中的,等待线程池中的线程执行当前任务后,再从阻塞队列中获取。
Blocking Queue阻塞队列:存放多余任务的地方,由主线程提供任务,线程池消费阻塞队列中的任务。
①自定义拒绝策略接口
为阻塞队列之外的任务提供策略的抽象方法
/**
* 拒绝策略接口
* 解决阻塞队列的任务满的时候,对多余的任务的处理的方式
*/
@FunctionalInterface
interface RejectPolicy<T>{
/**
* 策略抽象
* @param queue 阻塞队列
* @param task 任务
*/
void reject(BlockingQueue<T> queue,T task);
}
②自定义阻塞任务队列
@Slf4j
class BlockingQueue<T>{
// 1、任务队列 存放任务 双向队列 (先进先出)
private Deque<T> queue=new ArrayDeque<>();
// 2、锁 保证任务队列里的一个任务只有一个线程执行
private ReentrantLock lock=new ReentrantLock();
// 3、生产者条件变量 保证队列满的时候,生产者不再生产任务
private Condition fullWaitSet=lock.newCondition();
// 4、消费者条件变量 保证队列空的时候,线程不再消费任务
private Condition emptyWaitSet=lock.newCondition();
// 5、容量
private int capcity;
public BlockingQueue(int capcity) {
this.capcity = capcity;
}
// 带超时的阻塞获取 消费者获取任务
public T pull(long timeout, TimeUnit unit) {
lock.lock();
try {
//我们传入的超时时间转换为纳秒
long nanos = unit.toNanos(timeout);
//队列是否为空
while (queue.isEmpty()) {
try {
//返回的剩余时间小于0 证明已经超时
if (nanos < 0) {
// 无需等待直接返回
return null;
}
// 返回的值为:剩余等待的时间 = 设置的超时时间 - 已经等待的时间
nanos=emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//不为空 开始获取任务
//从任务队列中移除第一个任务并返回该任务
T task=queue.removeFirst();
// 唤醒生成者生产任务
fullWaitSet.signal();
return task;
}finally {
lock.unlock();
}
}
// 阻塞获取 消费者获取任务
public T take() {
lock.lock();
try {
//队列是否为空
while (queue.isEmpty()) {
try {
//为空等待
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//不为空 开始获取任务
//从任务队列中移除第一个任务并返回该任务
T task=queue.removeFirst();
// 唤醒生成者生产任务
fullWaitSet.signal();
return task;
}finally {
lock.unlock();
}
}
// 阻塞添加 生成者生产任务
public void put(T task) {
lock.lock();
try {
//队列中的任务数是否和容量相等
while (queue.size() == capcity) {
// 容量已满 阻塞
try {
log.info("等待加入任务队列{}",task);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.info("加入任务队列{}",task);
//没有满 添加任务
queue.addLast(task);
//唤醒消费者线程
emptyWaitSet.signal();
}finally {
lock.unlock();
}
}
// 带超时时间的阻塞添加 如果在指定的超时时间内该任务还没有进入队列则放弃执行该任务
public boolean offer(T task,long timeout,TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
//队列中的任务数是否和容量相等
while (queue.size() == capcity) {
// 容量已满 阻塞
try {
if (nanos < 0) {
return false;
}
log.info("等待加入任务队列{}",task);
nanos=fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.info("加入任务队列{}",task);
//没有满 添加任务
queue.addLast(task);
//唤醒消费者线程
emptyWaitSet.signal();
return true;
}finally {
lock.unlock();
}
}
// 获取阻塞队列中任务数
public int size() {
lock.lock();
try {
return queue.size();
}finally {
lock.unlock();
}
}
/**
* 策略模式对阻塞队列已满的抽象处理
* @param rejectPolicy 策略对象
* @param task 任务
*/
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
// 阻塞队列已满
if (queue.size() == capcity) {
//具体策略由调用者实现
rejectPolicy.reject(this,task);
}else{
//有空闲
log.info("加入任务队列{}",task);
//没有满 添加任务
queue.addLast(task);
//唤醒消费者线程
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}
}
③自定义线程池
/**
* 线程池
*/
@Slf4j
class ThreadPool{
// 任务队列
BlockingQueue<Runnable> taskQueue;
// 线程集合
private HashSet<Worker> workes=new HashSet();
// 核心的线程数(即线程池的线程总数)
private int coreSize;
// 获取任务的超时时间
private long timeout;
// 时间单位
private TimeUnit timeUnit;
/**
* 拒绝策略对象
*/
private RejectPolicy<Runnable> rejectPolicy;
/**
* 线程池中任务的创建
* @param task 任务对象
*/
public void execute(Runnable task){
// 当任务数没有超过coreSize时,直接交给Worker对象执行
// 当任务数超过了的coreSize是,加入任务队列暂存
// 当线程数还不够时,小于核心线程数时 该集合操作并不安全 需要加锁
synchronized (workes) {
if (workes.size() < coreSize) {
//新建一个线程执行此任务
Worker worker = new Worker(task);
log.info("新增worker{}, {}",worker, task);
// 加入到线程集合中
workes.add(worker);
// 执行任务 线程中的run执行任务中的run方法
worker.start();
} else {
// 队列已满 选择死等
// taskQueue.put(task);
// 策略模式的使用:具体实现由调用者选择。
// 当阻塞队列任务满的时候,执行的策略有很多种
// 1>死等
// 2>超时等待
// 3>让调用者放弃任务执行
// 4>让调用者抛出异常
// 5>让调用者抛出异常
// 这里我们通过策略模式来实现。
taskQueue.tryPut(rejectPolicy,task);
}
}
}
/**
* 初始化线程池
* @param coreSize 核心线程数
* @param timeout 超时时间
* @param timeUnit 超时时间单位
* @param queueCapcity 阻塞队列容量
* @param rejectPolicy 拒绝策略的实现对象
*/
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,int queueCapcity,RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.rejectPolicy=rejectPolicy;
this.taskQueue=new BlockingQueue<>(queueCapcity);
}
/**
* 自定义worker包装我们的线程类
*/
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
/**
* worker具体的执行方法
*/
@Override
public void run() {
// ①当task不为空,执行任务
// ②当task执行完毕,再接着从任务队列获取任务并执行
// 判断初始化的时候是否有任务 || 从阻塞队列中获取任务(执行全部任务)
// while (task != null || (task=taskQueue.take())!=null) {
// 获取阻塞队列中任务,使用带超时的获取方式。若等待一段时间,阻塞队列还没有任务则不等待
while (task != null || (task=taskQueue.pull(timeout, timeUnit))!=null) {
try {
log.info("正在执行...{}",task);
//相当于执行Runnable中的普通run方法
task.run();
} catch (Exception e) {
e.printStackTrace();
}finally {
//执行完毕后 将初始化的任务设为null
task=null;
}
}
// 所有阻塞的任务和初始化的任务都被执行完毕 移除线程
synchronized (workes) {
log.info("worker被移除{}",this);
//移除当前线程
workes.remove(this);
}
}
}
}
④测试
@Slf4j
public class TestPool {
public static void main(String[] args) {
ThreadPool threadPool=new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1,((queue,task)->{
// 策略模式测试:
// 1、实现 死等
// queue.put(task);
// 2、带超时的等待 设定的超时时间小于任务的执行时间则该任务放弃。
// queue.offer(task, 1500, TimeUnit.MILLISECONDS);
// 3、让调用者放弃任务执行
// log.info("放弃任务{}",task);
// 4、让调用者抛出异常
// throw new RuntimeException("任务执行失败"+task);
// 5、让调用者自己去执行任务
task.run();
}));
// 当需要执行的任务超过阻塞队列的长度时 由调用者来实现我们的策略(即真正执行的方法)
for (int i = 0; i < 3; i++) {
int j=i;
threadPool.execute(()->{
try {
//模拟执行很长
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("{}",j);
});
}
}
}
16:10:10.673 [main] INFO org.lc.Thread_pool.ThreadPool - 新增workerThread[Thread-0,5,main], org.lc.Thread_pool.TestPool$$Lambda$2/1030870354@22927a81
16:10:10.676 [main] INFO org.lc.Thread_pool.BlockingQueue - 加入任务队列org.lc.Thread_pool.TestPool$$Lambda$2/1030870354@20fa23c1
16:10:10.676 [Thread-0] INFO org.lc.Thread_pool.ThreadPool - 正在执行...org.lc.Thread_pool.TestPool$$Lambda$2/1030870354@22927a81
16:10:11.676 [main] INFO org.lc.Thread_pool.TestPool - 2
16:10:11.676 [Thread-0] INFO org.lc.Thread_pool.TestPool - 0
16:10:11.676 [Thread-0] INFO org.lc.Thread_pool.ThreadPool - 正在执行...org.lc.Thread_pool.TestPool$$Lambda$2/1030870354@20fa23c1
16:10:12.677 [Thread-0] INFO org.lc.Thread_pool.TestPool - 1
16:10:13.678 [Thread-0] INFO org.lc.Thread_pool.ThreadPool - worker被移除Thread[Thread-0,5,main]
2、ThreadPoolExecutor
package java.util.concurrent;
①线程池状态
ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量
从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING(111 为负数)
这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作 进行赋值
// c 为旧值, ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));
// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们
private static int ctlOf(int rs, int wc) { return rs | wc; }
②构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}
corePoolSize 核心线程数目 (最多保留的线程数,核心线程不会超时)
maximumPoolSize 最大线程数据(救急线程数+核心线程数)
keepAliveTime 生存时间 - 针对救急线程
TimeUnit 时间单位 - 针对救急线程
workQueue 阻塞队列
threadFactory 线程工厂 - 可以为线程创建时起个好名字
handler 拒绝策略(救急线程用完,阻塞队列已满,会执行该策略)
1)工作方式:
线程池中刚开始还没有线程,当一个任务提交给线程池后,线程池会创建一个新任务来执行任务
当现场数达到corePoolSize并没有空闲线程,这时再加入任务,新加的任务会被加入workQqueue队列排队,直到有空闲的线程
如果队列选择了有界队列,那么任务超过了队列大小时,会创建
maximumPoolSize - corePoolSize
数目的线 程来救急(阻塞任务队列满的时候,会根据这个线程数目来创建救急线程执行多余的任务)如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略(救急线程也全部创建完毕,还有其他的任务,则会执行拒绝策略)。拒绝策略 jdk 提供了 4 种实现,其它 著名框架也提供了实现
- AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略
- CallerRunsPolicy 让调用者运行任务
- DiscardPolicy 放弃本次任务
- DiscardOldestPolicy 放弃队列中早的任务,本任务取而代之
- ----------------------其他框架实现---------------------------
- Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方 便定位问题
- Netty 的实现,是创建一个新线程来执行任务
- ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
- PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
当高峰过去后,超过corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由 keepAliveTime 和 unit 来控制。
③newFixedThreadPool
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
}
特点:
核心线程数 = 最大线程数(没有救急线程被创建),因此也无需超时时间
阻塞队列是无界的,可以放任意数量的任务
任务执行完毕后,核心线程不会主动结束。
**使用场景:**适用于任务量已知,相对耗时的任务
@Slf4j
public class T1 {
public static void main(String[] args) {
// 创建两个核心线程数
// ExecutorService pool = Executors.newFixedThreadPool(2);
// 创建两个核心线程数 并自定义实现线程工厂
ExecutorService pool = Executors.newFixedThreadPool(2, new ThreadFactory() {
private AtomicInteger t=new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
// 自定义线程名称 使用cas赋值自增名称
return new Thread(r,"mypool_"+t.getAndIncrement());
}
});
pool.execute(()->{
log.info("1");
});
pool.execute(()->{
log.info("2");
});
pool.execute(()->{
log.info("3");
});
}
}
19:46:08.701 [mypool_1] INFO org.lc.Thread_pool.T1 - 1 //线程1 执行任务1
19:46:08.701 [mypool_2] INFO org.lc.Thread_pool.T1 - 2 //线程2 执行任务2
19:46:08.704 [mypool_1] INFO org.lc.Thread_pool.T1 - 3 //线程1 执行任务3
④newCachedThreadPool
public class Executors {
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
}
特定:
- 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着
- 全部都是救急线程(60s 后可以回收)
- 救急线程可以无限创建
- 队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交 货)
**使用场景:**整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线 程。 适合任务数比较密集,但每个任务执行时间较短的情况
⑤newSingleThreadExecutor
public class Executors {
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
}
使用场景:希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程 也不会被释放。
区别:
自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一 个线程,保证池的正常工作
Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改
- FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因 此不能调用 ThreadPoolExecutor 中特有的方法,不能对核心线程数等参数进行修改。
Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改
- 对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改
@Slf4j
public class T3 {
public static void main(String[] args) {
ExecutorService pool = Executors.newSingleThreadExecutor();
pool.execute(()->{
int i=1/0; //出现异常的是否 当前线程会放弃该任务
log.info("task 1");
});
pool.execute(()->{
log.info("task 2");
});
pool.execute(()->{
log.info("task 3");
});
}
}
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
at org.lc.Thread_pool.T3.lambda$main$0(T3.java:20)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
22:50:39.111 [pool-1-thread-2] INFO org.lc.Thread_pool.T3 - task 2
22:50:39.113 [pool-1-thread-2] INFO org.lc.Thread_pool.T3 - task 3
⑥提交任务
1) 执行任务execute
void execute(Runnable command)
2)提交任务submit,用返回值 Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);
@Slf4j
public class T4 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(2);
Future<String> submit = pool.submit(() -> {
log.info("running...");
Thread.sleep(1000);
return "ok";
});
//主线程会阻塞 直到获取到值
log.info(submit.get());
}
}
11:01:09.925 [pool-1-thread-1] INFO org.lc.Thread_pool.T4 - running...
11:01:10.927 [main] INFO org.lc.Thread_pool.T4 - ok
3) 提交所有任务invokeAll
传入一个集合任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
@Slf4j
public class T4 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(2);
List<Future<Object>> futures = pool.invokeAll(Arrays.asList(
() -> {
log.info("running1...");
Thread.sleep(1000);
return "1";
},
() -> {
log.info("running2...");
Thread.sleep(500);
return "2";
},
() -> {
log.info("running3...");
Thread.sleep(2000);
return "3";
}
));
futures.forEach((f)->{
try {
log.info("{}",f.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
}
}
11:23:50.082 [pool-1-thread-1] INFO org.lc.Thread_pool.T4 - running1...
11:23:50.082 [pool-1-thread-2] INFO org.lc.Thread_pool.T4 - running2...
11:23:50.584 [pool-1-thread-2] INFO org.lc.Thread_pool.T4 - running3...
11:23:52.585 [main] INFO org.lc.Thread_pool.T4 - 1
11:23:52.586 [main] INFO org.lc.Thread_pool.T4 - 2
11:23:52.586 [main] INFO org.lc.Thread_pool.T4 - 3
4)提交所有任务invokeAll,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;
5) 提交所有任务invokeAny,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
@Slf4j
public class T4 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(1);
//只有1个核心线程时,第一个任务先执行后返回。
//当有3个核心线程时,三个任务并行执行,任务2线程先返回
String s=pool.invokeAny(Arrays.asList(
() -> {
log.info("running1...");
Thread.sleep(1000);
log.info("end1...");
return "1";
},
() -> {
log.info("running2...");
Thread.sleep(500);
log.info("end2...");
return "2";
},
() -> {
log.info("running3...");
Thread.sleep(2000);
log.info("end3...");
return "3";
}
));
log.info("结果:{}",s);
}
}
11:35:45.933 [pool-1-thread-1] INFO org.lc.Thread_pool.T4 - running1...
11:35:46.936 [pool-1-thread-1] INFO org.lc.Thread_pool.T4 - end1...
11:35:46.936 [pool-1-thread-1] INFO org.lc.Thread_pool.T4 - running2...
11:35:46.936 [main] INFO org.lc.Thread_pool.T4 - 结果:1
6)带超时时间的inVokeAny
<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
⑦关闭线程池
1)shutdown
- 线程池状态变为 SHUTDOWN
- 不会接收新任务
- 但已提交任务会执行完 ,阻塞队列任务会处理完
- 此方法不会阻塞调用线程的执行(即主线程的调用不会等待该方法的执行,继续执行主线程后面的代码)
void shutdown();
public class ThreadPoolExecutor extends AbstractExecutorService {
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//修改线程池状态
advanceRunState(SHUTDOWN);
//仅仅会打断空闲线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)
tryTerminate();
}
}
2)shutdownNow
- 线程池状态变为stop
- 不会接收新任务
- 会将队列中的任务返回
- 并用 interrupt 的方式中断正在执行的任务
List<Runnable> shutdownNow();
public class ThreadPoolExecutor extends AbstractExecutorService {
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//修改线程池转状态
advanceRunState(STOP);
//打断所有线程
interruptWorkers();
//获取队列中剩余任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//尝试终结
tryTerminate();
return tasks;
}
}
3)其他方法
// 不在 RUNNING 状态的线程池,此方法就返回
true boolean isShutdown();
// 线程池状态是否是
TERMINATED boolean isTerminated();
// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事 情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
4)测试
shutdown、awaitTermination
@Slf4j
public class T5 {
public static void main(String[] args) throws InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(2);
Future<Integer> submit1 = pool.submit(() -> {
log.info("task 1 running...");
Thread.sleep(1000);
log.info("task 1 finish ");
return 1;
});
Future<Integer> submit2 = pool.submit(() -> {
log.info("task 2 running...");
Thread.sleep(1000);
log.info("task 2 finish ");
return 1;
});
Future<Integer> submit3 = pool.submit(() -> {
log.info("task 3 running...");
Thread.sleep(1000);
log.info("task 3 finish ");
return 1;
});
log.info("shutdown...");
pool.shutdown();
//awaitTermination阻塞主线程等待的指定时间 如果主线程等待的时间内,线程池中的任务全部执行完毕,则结束等待。
// 一般我们使用 Futher中的get方法比较好。 直接阻塞等待获取结果
pool.awaitTermination(3, TimeUnit.SECONDS);
//shutdown不会阻塞主线程
log.info("other...");
//后面不能再执行其他的运行方法。因为该线程池已经终止运行。
}
}
15:20:04.030 [main] INFO org.lc.Thread_pool.T5 - shutdown...
15:20:04.030 [pool-1-thread-2] INFO org.lc.Thread_pool.T5 - task 2 running...
15:20:04.030 [pool-1-thread-1] INFO org.lc.Thread_pool.T5 - task 1 running...
15:20:05.032 [pool-1-thread-1] INFO org.lc.Thread_pool.T5 - task 1 finish
15:20:05.032 [pool-1-thread-2] INFO org.lc.Thread_pool.T5 - task 2 finish
15:20:05.032 [pool-1-thread-2] INFO org.lc.Thread_pool.T5 - task 3 running...
15:20:06.032 [pool-1-thread-2] INFO org.lc.Thread_pool.T5 - task 3 finish
15:20:06.032 [main] INFO org.lc.Thread_pool.T5 - other...
shutdownNow
@Slf4j
public class T5 {
public static void main(String[] args) throws InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(2);
Future<Integer> submit1 = pool.submit(() -> {
log.info("task 1 running...");
Thread.sleep(1000);
log.info("task 1 finish ");
return 1;
});
Future<Integer> submit2 = pool.submit(() -> {
log.info("task 2 running...");
Thread.sleep(1000);
log.info("task 2 finish ");
return 1;
});
Future<Integer> submit3 = pool.submit(() -> {
log.info("task 3 running...");
Thread.sleep(1000);
log.info("task 3 finish ");
return 1;
});
log.info("shutdownNow...");
//返回队列中的任务
List<Runnable> runnables = pool.shutdownNow();
log.info("{}",runnables);
}
}
//所有任务1和任务2被打断。任务3在队列中被返回
15:24:56.477 [main] INFO org.lc.Thread_pool.T5 - shutdownNow...
15:24:56.477 [pool-1-thread-1] INFO org.lc.Thread_pool.T5 - task 1 running...
15:24:56.477 [pool-1-thread-2] INFO org.lc.Thread_pool.T5 - task 2 running...
15:24:56.480 [main] INFO org.lc.Thread_pool.T5 - [java.util.concurrent.FutureTask@7506e922]
3、异步模式之工作线程
①定义
让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务。也可以将其归类为分工模式,它的典型实现 就是线程池,也体现了经典设计模式中享元模式。
注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率
例如,如果一个餐馆的工人既要招呼客人(任务类型A),又要到后厨做菜(任务类型B)显然效率不咋地,分成 服务员(线程池A)与厨师(线程池B)更为合理,当然你能想到更细致的分工
②饥饿现象
固定大小线程池会有饥饿现象
- 两个工人是同一个线程池中的两个线程
- 他们要做的事情是:为客人点餐和到后厨做菜,这是两个阶段的工作
- 客人点餐:必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待
- 后厨做菜:没啥说的,做就是了
- 比如工人A 处理了点餐任务,接下来它要等着 工人B 把菜做好,然后上菜,他俩也配合的蛮好
- 但现在同时来了两个客人,这个时候工人A 和工人B 都去处理点餐了,这时没人做饭了,饥饿
@Slf4j
public class T6 {
static final List<String> MENU = Arrays.asList("鱼香肉丝", "土豆肉丝", "鱼香茄子", "宫保鸡丁");
static Random random=new Random();
static String cooking(){return MENU.get(random.nextInt(MENU.size()));}
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(2);
//占用一个线程
pool.execute(()->{
log.info("处理点餐");
//占用一个线程
Future<String> submit = pool.submit(() -> {
log.info("做菜...");
return cooking();
});
try {
log.info("上菜:{}",submit.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
//占用一个线程
pool.execute(()->{
log.info("处理点餐");
//占用一个线程
Future<String> submit = pool.submit(() -> {
log.info("做菜...");
return cooking();
});
try {
log.info("上菜:{}",submit.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
}
}
16:02:08.123 [pool-1-thread-1] INFO org.lc.Thread_pool.T6 - 处理点餐
16:02:08.123 [pool-1-thread-2] INFO org.lc.Thread_pool.T6 - 处理点餐
③饥饿问题解决
解决方法可以增加线程池的大小,不过不是根本解决方案,还是前面提到的**,不同的任务类型,采用不同的线程 池**.
结论:若在一个线程池中一个任务需要等待另一个任务的结果,需要用不同的线程池去处理
@Slf4j
public class T6 {
static final List<String> MENU = Arrays.asList("鱼香肉丝", "土豆肉丝", "鱼香茄子", "宫保鸡丁");
static Random random=new Random();
static String cooking(){return MENU.get(random.nextInt(MENU.size()));}
public static void main(String[] args) {
ExecutorService waiterPool = Executors.newFixedThreadPool(1);
ExecutorService cookPool = Executors.newFixedThreadPool(1);
waiterPool.execute(()->{
log.info("处理点餐");
Future<String> submit = cookPool.submit(() -> {
log.info("做菜...");
return cooking();
});
try {
log.info("上菜:{}",submit.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
waiterPool.execute(()->{
log.info("处理点餐");
Future<String> submit = cookPool.submit(() -> {
log.info("做菜...");
return cooking();
});
try {
log.info("上菜:{}",submit.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
}
}
16:05:34.667 [pool-1-thread-1] INFO org.lc.Thread_pool.T6 - 处理点餐
16:05:34.671 [pool-2-thread-1] INFO org.lc.Thread_pool.T6 - 做菜...
16:05:34.671 [pool-1-thread-1] INFO org.lc.Thread_pool.T6 - 上菜:鱼香茄子
16:05:34.673 [pool-1-thread-1] INFO org.lc.Thread_pool.T6 - 处理点餐
16:05:34.673 [pool-2-thread-1] INFO org.lc.Thread_pool.T6 - 做菜...
16:05:34.673 [pool-1-thread-1] INFO org.lc.Thread_pool.T6 - 上菜:宫保鸡丁
④创建多少线程池大小合适
过小会导致程序不能充分地利用系统资源、容易导致饥饿
过大会导致更多的线程上下文切换,占用更多内存
cpu密集型:
通常采用 cpu 核数 + 1 能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因 导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费
I/O 密集型运算
CPU 不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用 CPU 资源,但当你执行 I/O 操作时、远程 RPC 调用时,包括进行数据库操作时,这时候 CPU 就闲下来了,你可以利用多线程提高它的利用率。
经验公式如下
线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间
例如 4 核 CPU 计算时间是 50% ,其它等待时间是 50%,期望 cpu 被 100% 利用,套用公式
4 * 100% * 100% / 50% = 8
例如 4 核 CPU 计算时间是 10% ,其它等待时间是 90%,期望 cpu 被 100% 利用,套用公式
4 * 100% * 100% / 10% = 40
4、任务调度线程池ScheduledThreadPoolExecutor
在『任务调度线程池』功能加入之前,可以使用Timer来实现定时功能。
Timer的优缺点:
- 优点
- 简单易用
- 缺点
- 所有任务串行执行,同一时间只能有一个任务在执行,前一个任务的延时和异常都将影响之后的任务
ScheduledThreadPoolExecutor的优点:
- 当核心线程数够的时候,任务会并行执行
- 异常发生不会影响其他任务
①延时执行
/**
* 延时执行任务
* @param command 任务实现
* @param delay 延迟时间
* @param unit 时间单位
* @return
*/
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
@Slf4j
public class T7 {
public static void main(String[] args) {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
//延时 1s执行
pool.schedule(()->{
log.info("start...");
try {
//若有异常 不会触发其他任务的终止操作
int i=1/0;
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 1, TimeUnit.SECONDS);
// 延时1s 执行
pool.schedule(()->{
log.info("start...");
}, 1, TimeUnit.SECONDS);
log.info("mian...");
}
}
20:15:18.834 [main] INFO org.lc.Thread_pool.T7 - mian...
//延时1s后执行
20:15:19.832 [pool-1-thread-1] INFO org.lc.Thread_pool.T7 - start...
20:15:19.834 [pool-1-thread-2] INFO org.lc.Thread_pool.T7 - start...
②周期循环执行
若任务的执行时间超过了 period的设置周期循环时间 则会执行该任务后马上执行下一个任务 若任务的执行时间小于了 period的设置周期循环时间,则会按照period周期时间循环
/**
* 延迟任务执行后,按照固定速率运行
* @param command 任务实现
* @param initialDelay 延期时间
* @param period 周期循环时间
* @param unit 时间单位
* @return
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
@Slf4j
public class T8 {
public static void main(String[] args) {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.info("start...");
//延期2s 执行该任务
// 周期为4s
pool.scheduleAtFixedRate(()->{
log.info("running..");
try {
//若任务的执行时间超过了 period的设置周期循环时间 则会执行该任务后马上执行下一个任务
//若任务的执行时间小于了 period的设置周期循环时间,则会按照period周期时间循环
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 2, 4, TimeUnit.SECONDS);
}
}
21:04:07.118 [main] INFO org.lc.Thread_pool.T8 - start...
21:04:09.160 [pool-1-thread-1] INFO org.lc.Thread_pool.T8 - running..
21:04:13.160 [pool-1-thread-1] INFO org.lc.Thread_pool.T8 - running..
21:04:17.159 [pool-1-thread-1] INFO org.lc.Thread_pool.T8 - running..
③周期循环执行(总是会加上周期时间)
总是运行该任务完毕后,再按照指定周期开始运行
/**
* 延迟任务执行后,任务间隔为:任务运行时间+周期时间
* @param command 任务实现
* @param initialDelay 延期时间
* @param period 周期循环时间
* @param unit 时间单位
* @return
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
@Slf4j
public class T8 {
public static void main(String[] args) {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.info("start...");
//延期2s 执行该任务
// 任务3s 周期为2s 距离下一次任务5s
pool.scheduleWithFixedDelay(()->{
log.info("running..");
try {
//总是运行该任务完毕后,再按照指定周期开始运行
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 2, 2, TimeUnit.SECONDS);
}
}
21:11:38.213 [main] INFO org.lc.Thread_pool.T8 - start...
21:11:40.257 [pool-1-thread-1] INFO org.lc.Thread_pool.T8 - running..
21:11:45.259 [pool-1-thread-1] INFO org.lc.Thread_pool.T8 - running..
5、处理线程池的异常
①主动捕捉异常
@Slf4j
public class T9 {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(1);
pool.execute(()->{
// 主动捕捉异常
try {
log.info("task1");
int i=1/0;
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
21:50:55.261 [pool-1-thread-1] INFO org.lc.Thread_pool.T9 - task1
java.lang.ArithmeticException: / by zero
at org.lc.Thread_pool.T9.lambda$main$0(T9.java:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
②配合submit提交任务,返回Futrue对象来处理异常
无异常,则返回指定的值
有异常,返回异常对象信息
@Slf4j
public class T9 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(1);
Future<Boolean> task1 = pool.submit(() -> {
log.info("task1");
int i = 1 / 0;
return true;
});
log.info("{}",task1.get());
}
}
21:52:42.990 [pool-1-thread-1] INFO org.lc.Thread_pool.T9 - task1
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.lc.Thread_pool.T9.main(T9.java:27)
Caused by: java.lang.ArithmeticException: / by zero
at org.lc.Thread_pool.T9.lambda$main$0(T9.java:24)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
6、定时执行任务应用
如何让每周四 18:00:00 定时执行任务?
public class T10 {
public static void main(String[] args) {
// 获得当前时间
LocalDateTime now = LocalDateTime.now();
// 获取本周四 18:00:00.000
LocalDateTime thursday = now.with(DayOfWeek.THURSDAY).withHour(18).withMinute(0).withSecond(0).withNano(0);
// 如果当前时间已经超过 本周四 18:00:00.000, 那么找下周四 18:00:00.000
if (now.compareTo(thursday) >= 0) {
thursday = thursday.plusWeeks(1);
}
// 计算时间差,即延时执行时间
long initialDelay = Duration.between(now, thursday).toMillis();
// 计算间隔时间,即 1 周的毫秒值
long oneWeek = 7 * 24 * 3600 * 1000;
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
System.out.println("开始时间:" + new Date());
executor.scheduleAtFixedRate(() -> {
System.out.println("执行时间:" + new Date());
}, initialDelay, oneWeek, TimeUnit.MILLISECONDS);
}
}
7、Fork/Join
①概念
Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型 运算
所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计 算,如归并排序、斐波那契数列、都可以用分治思想进行求解
Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运 算效率
Fork/Join 默认会创建与 cpu 核心数大小相同的线程池
- 采用 “工作窃取”模式(work-stealing): 当执行新的任务时它可以将其拆分分成更小的任务执行,并将小任务加到线程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中。
- 相对于一般的线程池实现,fork/join框架的优势体现在对其中包含的任务的处理方式上.在一般的线程池中,如果一个线程正在执行的任务由于某些原因无法继续运行,那么该线程会处于等待状态。而在fork/join框架实现中,如果某个子问题由于等待另外一个子问题的完成而无法继续运行。那么处理该子问题的线程会主动寻找其他尚未运行的子问题来执行.这种方式减少了线程的等待时间,提高了性能。
②使用
提交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值),例如下 面定义了一个对 1~n 之间的整数求和的任务
@Slf4j
public class T11 {
public static void main(String[] args) {
//4个线程
ForkJoinPool forkJoinPool=new ForkJoinPool(4);
System.out.println(forkJoinPool.invoke(new AddTask1(5)));
}
}
@Slf4j
class AddTask1 extends RecursiveTask<Integer> {
int n;
public AddTask1(int n) {
this.n = n;
}
@Override
public String toString() {
return "{" + this.n + "}";
}
@Override
protected Integer compute() {
// 如果为n=1则可以返回了
if (n == 1) {
log.info("join() {}", n);
return n;
}
//将任务进行拆分
AddTask1 t1 = new AddTask1(n - 1);
//加入到线程队列中
t1.fork();
log.debug("fork() {} + {}", n, t1);
// 合并结果
int result=n+t1.join();
log.debug("join() {} + {} = {}", n, t1, result);
return result;
}
}
12:58:16.090 [ForkJoinPool-1-worker-0] DEBUG org.lc.Thread_pool.AddTask1 - fork() 2 + {1}
12:58:16.090 [ForkJoinPool-1-worker-2] DEBUG org.lc.Thread_pool.AddTask1 - fork() 4 + {3}
12:58:16.094 [ForkJoinPool-1-worker-0] INFO org.lc.Thread_pool.AddTask1 - join() 1
12:58:16.094 [ForkJoinPool-1-worker-0] DEBUG org.lc.Thread_pool.AddTask1 - join() 2 + {1} = 3
12:58:16.090 [ForkJoinPool-1-worker-3] DEBUG org.lc.Thread_pool.AddTask1 - fork() 3 + {2}
12:58:16.090 [ForkJoinPool-1-worker-1] DEBUG org.lc.Thread_pool.AddTask1 - fork() 5 + {4}
12:58:16.094 [ForkJoinPool-1-worker-3] DEBUG org.lc.Thread_pool.AddTask1 - join() 3 + {2} = 6
12:58:16.094 [ForkJoinPool-1-worker-2] DEBUG org.lc.Thread_pool.AddTask1 - join() 4 + {3} = 10
12:58:16.094 [ForkJoinPool-1-worker-1] DEBUG org.lc.Thread_pool.AddTask1 - join() 5 + {4} = 15
8、J.U.C(java.util.concurrent)
1.AQS原理
1)概述
全称AbstractQueuedSynchronizer,阻塞式锁(和synchronized类似)和相关的同步器工具的框架
特点:
- 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取 锁和释放锁
- getState - 获取 state 状态
- setState - 设置 state 状态
- compareAndSetState - cas 机制设置 state 状态
- 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
- 提供了基于 FIFO(先进先出)的等待队列,类似于 Monitor 的 EntryList
- 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet
子类主要实现这样一些方法(默认抛出 UnsupportedOperationException)
tryAcquire 获得锁是否成功
tryRelease 释放锁是否成功
tryAcquireShared
tryReleaseShared
isHeldExclusively 是否是独占锁
获取锁的姿势:
tryAcquire:如果获得锁成功返回true,否则返回false
// 如果获取锁失败
if (!tryAcquire(arg)) {
// 入队, 可以选择阻塞当前线程
park unpark
}
释放锁的姿势:
tryRelease: 释放锁成功返回true,否则返回false
// 如果释放锁成功
if (tryRelease(arg)) {
// 让阻塞线程恢复运行
}
2)自定义不可重入锁(独占锁)
/**
* 自定义不可重入锁(独占锁)
*/
@Slf4j
class MyLock implements Lock {
//独占锁 同步器类
class MySync extends AbstractQueuedSynchronizer{
//private volatile int state; 初始状态为0
// 这里我们 1表示获得了锁 ; 0表示没有获得锁
@Override
protected boolean tryAcquire(int arg) {
// 防止其他线程修改同时state,使用cas修改保证state原子性
if (compareAndSetState(0, 1)) {
// 修改成功
// 设置持有者的线程。设置owner为当前线程 (类似synchronized中的monitor)
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
//加锁失败
return false;
}
@Override
protected boolean tryRelease(int arg) {
// private transient Thread exclusiveOwnerThread;
//释放锁后,将owner设为null
setExclusiveOwnerThread(null);
// 释放锁的时候 没有其他线程竞争 因为只有获得了锁才能释放锁
// 这里注意:private volatile int state; state为volatile修饰的变量,或加入写屏障,保证有序性。
setState(0);
return true;
}
/**
* 是否持有独占锁
* @return
*/
@Override
protected boolean isHeldExclusively() {
return getState()==1;
}
public Condition newCondition() {
return new ConditionObject();
}
}
private MySync sync=new MySync();
/**
* 加锁 加锁失败,放入队列中等待
*/
@Override
public void lock() {
sync.acquire(1);
}
/**
*加锁 可打断
* @throws InterruptedException
*/
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
/**
* 尝试加锁(一次)
* @return
*/
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
/**
* 尝试加锁 带超时时间
* @param time
* @param unit
* @return
* @throws InterruptedException
*/
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
// 将时间转换为纳秒
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
/**
* 解锁
*/
@Override
public void unlock() {
//release会调用我们覆盖的tryTelease方法(设置state为0,owner为null)。并唤醒阻塞等待的线程
sync.release(1);
}
/**
* 创建阻塞队列条件变量
* @return
*/
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
正常测试:
@Slf4j
public class T1 {
public static void main(String[] args) {
MyLock myLock=new MyLock();
new Thread(()->{
log.info("尝试获得锁...");
myLock.lock();
try {
log.info("开始执行...");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
log.info("unlock...");
myLock.unlock();
}
},"t1").start();
new Thread(()->{
log.info("尝试获得锁...");
myLock.lock();
try {
log.info("开始执行...");
}finally {
log.info("unlock...");
myLock.unlock();
}
},"t2").start();
}
}
22:53:48.912 [t1] INFO org.lc.juc_aqs.T1 - 尝试获得锁...
22:53:48.912 [t2] INFO org.lc.juc_aqs.T1 - 尝试获得锁...
22:53:48.915 [t1] INFO org.lc.juc_aqs.T1 - 开始执行...
22:53:50.916 [t1] INFO org.lc.juc_aqs.T1 - unlock...
22:53:50.916 [t2] INFO org.lc.juc_aqs.T1 - 开始执行...
22:53:50.916 [t2] INFO org.lc.juc_aqs.T1 - unlock...
独占锁测试:
@Slf4j
public class T1 {
public static void main(String[] args) {
MyLock myLock=new MyLock();
new Thread(()->{
//两次加锁
myLock.lock();
log.info("获得锁1...");
myLock.lock();
log.info("获得锁2...");
try {
log.info("开始执行...");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
log.info("unlock...");
myLock.unlock();
}
},"t1").start();
}
}
22:57:02.687 [t1] INFO org.lc.juc_aqs.T1 - 获得锁1...
2.ReentrantLock原理
1)非公平锁实现原理
通过ReentrantLock默认的构造器来看
public ReentrantLock() {
sync = new NonfairSync();
}
static final class FairSync extends Sync {}
abstract static class Sync extends AbstractQueuedSynchronizer {}
FairSync 继承 AQS(AbstractQueuedSynchronizer)
2)可重入锁原理
3)可打断原理
4)公平锁实现原理
5)条件变量实现原理
3.ReentrantReadWriteLock 读写锁
当读操作远远高于写操作时,这时候使用 读写锁
让 读-读
(可以并发多个线程同时读取操作)可以并发,提高性能。 类似于数据库中的 select ... from ... lock in share mode
提供一个 数据容器类 内部分别使用读锁保护数据的read()
方法,写锁保护数据的write()
方法
@Slf4j
class DataContranier{
private Object object;
private ReentrantReadWriteLock r=new ReentrantReadWriteLock();
// 读锁
ReentrantReadWriteLock.ReadLock readLock=r.readLock();
// 写锁
ReentrantReadWriteLock.WriteLock writeLock=r.writeLock();
//读取
public Object read() throws InterruptedException {
log.info("获取读锁...");
readLock.lock();
try {
log.info("读取");
Thread.sleep(1000);
return object;
}finally {
log.info("释放读锁...");
readLock.unlock();
}
}
// 修改
public void write() throws InterruptedException {
log.info("获取写锁...");
writeLock.lock();
try {
log.info("写入");
Thread.sleep(1000);
}finally {
log.info("释放写锁...");
readLock.unlock();
}
}
}
读-读(可并发)
1)通过下面我们可以发现,多个线程可以实现并发读取的操作。
public class T1 {
public static void main(String[] args) {
DataContranier contranier=new DataContranier();
new Thread(()->{
try {
contranier.read();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t1").start();
new Thread(()->{
try {
contranier.read();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t2").start();
}
}
15:09:32.745 [t1] INFO org.lc.reentrantreadwrite_lock.DataContranier - 获取读锁...
15:09:32.745 [t2] INFO org.lc.reentrantreadwrite_lock.DataContranier - 获取读锁...
15:09:32.748 [t2] INFO org.lc.reentrantreadwrite_lock.DataContranier - 读取
15:09:32.748 [t1] INFO org.lc.reentrantreadwrite_lock.DataContranier - 读取
15:09:33.748 [t2] INFO org.lc.reentrantreadwrite_lock.DataContranier - 释放读锁...
15:09:33.748 [t1] INFO org.lc.reentrantreadwrite_lock.DataContranier - 释放读锁...
读-写(互斥)
2)通过以下发现,对一个数据的读取
和写入
操作是互斥的
public class T1 {
public static void main(String[] args) {
DataContranier contranier=new DataContranier();
new Thread(()->{
try {
contranier.read();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t1").start();
new Thread(()->{
try {
contranier.write();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t2").start();
}
}
15:16:51.366 [t2] INFO org.lc.reentrantreadwrite_lock.DataContranier - 获取写锁...
15:16:51.366 [t1] INFO org.lc.reentrantreadwrite_lock.DataContranier - 获取读锁...
15:16:51.368 [t2] INFO org.lc.reentrantreadwrite_lock.DataContranier - 写入
15:16:52.369 [t2] INFO org.lc.reentrantreadwrite_lock.DataContranier - 释放写锁...
15:16:52.369 [t1] INFO org.lc.reentrantreadwrite_lock.DataContranier - 读取
15:16:53.369 [t1] INFO org.lc.reentrantreadwrite_lock.DataContranier - 释放读锁...
写-写(互斥)
3)4)注意事项
锁重入时升级不支持:
- 读锁不支持条件变量
- 即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
r.lock();
try { // ...
w.lock();
try {
// ...
} finally{
w.unlock();
}
} finally{
r.unlock();
}
锁重入时降级支持
- 持有写锁的情况下去获取读锁支持
//伪代码
class CachedData {
Object data;
// 是否有效,如果失效,需要重新计算 data
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// 获取写锁前必须释放读锁
rwl.readLock().unlock();
rwl.writeLock().lock();
// try {
// 判断是否有其它线程已经获取了写锁、更新了缓存, 避免重复更新
if (!cacheValid) {
data = ...
cacheValid = true;
}
// 降级为读锁, 释放写锁, 这样能够让其它线程读取缓存
rwl.readLock().lock();
} finally{
rwl.writeLock().unlock();
}
// 自己用完数据, 释放读锁
try{
use(data);
} finally{
rwl.readLock().unlock();
}
}
}
读写锁原理
5)4.StampedLock(带戳的读写锁)
该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用
加解读锁:
long stamp = lock.readLock();
lock.unlockRead(stamp);
加解写锁:
long stamp = lock.writeLock();
lock.unlockWrite(stamp)
乐观读,StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次 戳校验
如果校验通 过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。
//仅仅返回一个戳 没有加任何锁
long stamp = lock.tryOptimisticRead();
//若其他线程对戳进程更改,则验戳失败
// 验戳
if(!lock.validate(stamp)){
// 锁升级 升级为读锁 保证线程互斥
}
提供一个 数据容器类 内部分别使用读锁保护数据的
read()
方法,写锁保护数据的write()
方法
@Slf4j
class DataContainerStamped{
// 模拟读取和修改的数据
private int data;
// 带戳的读写锁
private final StampedLock lock=new StampedLock();
public DataContainerStamped(int data) {
this.data = data;
}
//读操作
public int read(int readTime) throws InterruptedException {
// 乐观读获取戳(此操作只是返回一个戳) 读操作并发所以可以不用加锁 只是判断该戳有没有被修改
long stamp = lock.tryOptimisticRead();
log.info("optimisitic read locking:{}",stamp);
// 模拟业务处理
Thread.sleep(readTime);
//验戳:若在此期间 戳没有被其他线程修改过
if (lock.validate(stamp)) {
log.info("read success:{}",data);
//验证成功 直接返回该值
return data;
}
//验戳失败
// 加读锁 保持互斥
log.info("updating to read lock...{}",stamp);
try {
stamp = lock.readLock();
log.info("read lock {}", stamp);
Thread.sleep(readTime);
log.info("read finish {} data {}",stamp,data);
return data;
}finally {
//释放读锁
log.info("read unlock:{}",stamp);
lock.unlockRead(stamp);
}
}
//写操作
public void write(int newData) throws InterruptedException {
long stamp = lock.writeLock();
log.info("write log {}",stamp);
try {
Thread.sleep(2000);
this.data=newData;
}finally {
log.info("write unlock:{}",stamp);
lock.unlockWrite(stamp);
}
}
}
读-读(并发)
1)我们可以发现都是走的乐观读的流程,没有走加读锁的流程,因为我们读是可以并发的,我们只要验证戳即可。
@Slf4j
public class T2 {
public static void main(String[] args) {
DataContainerStamped dataContainerStamped = new DataContainerStamped(1);
new Thread(()->{
try {
dataContainerStamped.read(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t1").start();
new Thread(()->{
try {
dataContainerStamped.read(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t2").start();
}
}
11:49:40.027 [t2] INFO org.lc.reentrantreadwrite_lock.DataContainerStamped - optimisitic read locking:256
11:49:40.027 [t1] INFO org.lc.reentrantreadwrite_lock.DataContainerStamped - optimisitic read locking:256
11:49:41.030 [t2] INFO org.lc.reentrantreadwrite_lock.DataContainerStamped - read success:1
11:49:41.030 [t1] INFO org.lc.reentrantreadwrite_lock.DataContainerStamped - read success:1
读-写(互斥)
2)我们可以发现,t1线程读取数据的方法先使用乐观读的方式验证戳,此时t2线程已经获得了写锁,将数据进行了修改,所以此时t1线程验戳失败,会进入锁升级,加读锁的流程,但是此时锁被t2线程写锁获取,t1线程阻塞。当t2线程释放写锁的时候,t1线程获得读锁,读取数据成功。
@Slf4j
public class T2 {
public static void main(String[] args) throws InterruptedException {
DataContainerStamped dataContainerStamped = new DataContainerStamped(1);
new Thread(()->{
try {
dataContainerStamped.read(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t1").start();
new Thread(()->{
try {
dataContainerStamped.write(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t2").start();
}
}
12:06:48.315 [t2] INFO org.lc.reentrantreadwrite_lock.DataContainerStamped - write log 384
12:06:48.315 [t1] INFO org.lc.reentrantreadwrite_lock.DataContainerStamped - optimisitic read locking:256
12:06:49.320 [t1] INFO org.lc.reentrantreadwrite_lock.DataContainerStamped - updating to read lock...256
12:06:50.320 [t2] INFO org.lc.reentrantreadwrite_lock.DataContainerStamped - write unlock:384
12:06:50.320 [t1] INFO org.lc.reentrantreadwrite_lock.DataContainerStamped - read lock 513
12:06:51.320 [t1] INFO org.lc.reentrantreadwrite_lock.DataContainerStamped - read finish 513 data 2
12:06:51.320 [t1] INFO org.lc.reentrantreadwrite_lock.DataContainerStamped - read unlock:513
注意事项:
- StampedLock 不支持条件变量 (不能使用类似await,和signal方法)
- StampedLock 不支持可重入
5.Semaphore信号量
/ˈseməfɔː(r)/
信号量,用来限制能同时访问共享资源的线程上限
acquire();
获得许可
release();
释放许可
1)基本使用
@Slf4j
public class T2 {
public static void main(String[] args) {
//最多允许同时3个线程允许 默认为非公平锁
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 5; i++) {
new Thread(()->{
try {
//获得许可
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
log.info("running...");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
log.info("release...");
//释放许可占有量
semaphore.release();
}
},""+i).start();
}
}
}
15:51:57.575 [2] INFO org.lc.juc.T2 - running...
15:51:57.575 [1] INFO org.lc.juc.T2 - running...
15:51:57.575 [0] INFO org.lc.juc.T2 - running...
15:51:58.577 [2] INFO org.lc.juc.T2 - release...
15:51:58.577 [1] INFO org.lc.juc.T2 - release...
15:51:58.577 [0] INFO org.lc.juc.T2 - release...
15:51:58.577 [4] INFO org.lc.juc.T2 - running...
15:51:58.577 [3] INFO org.lc.juc.T2 - running...
15:51:59.578 [3] INFO org.lc.juc.T2 - release...
15:51:59.578 [4] INFO org.lc.juc.T2 - release...
我们可以通过以上发现,做多3个线程并行执行,后续线程执行阻塞。
2)数据库连接池的改进
Connection连接
//模拟连接对象实现
class MockConnection implements Connection{
//....
}
public class T3 {
public static void main(String[] args) {
Pool pool = new Pool(2);
for (int i = 0; i < 5; i++) {
new Thread(()->{
Connection con = pool.borrow();
try {
//模拟 随机1秒内的执行时间
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
pool.free(con);
},""+(i+1)).start();
}
}
}
@Slf4j
class Pool{
// 连接池大小
private final int poolSize;
// 连接对象数组
private Connection[] connections;
// 连接状态数组 0空闲 1表示繁忙
private AtomicIntegerArray states;
// 信号量
private Semaphore semaphore;
// 构造方法初始化
public Pool(int poolSize) {
//让许可数和池的大小一致
this.semaphore = new Semaphore(poolSize);
this.poolSize = poolSize;
this.connections = new Connection[poolSize];
this.states=new AtomicIntegerArray(new int[poolSize]);
for (int i = 0; i < poolSize; i++) {
// 初始化所有连接
connections[i]=new MockConnection("连接"+(i+1));
}
}
// 借连接
public Connection borrow() {
try {
//获得许可
semaphore.acquire(); //没有获得许可的线程在此等待
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < poolSize; i++) {
// 获取空闲连接
// 使用cas操作 将该位置的连接更新为1 表示该位置的连接已被占用
if(states.compareAndSet(i, 0, 1)){
// 获得连接
log.info("借到连接 {}",connections[i]);
return connections[i];
}
}
// 不会执行到这,因为已经获得了许可那么池中肯定有空闲的线程
return null;
}
/**
* 归还连接
* @param connection 归还的连接对象
*/
public void free(Connection connection){
for (int i = 0; i < poolSize; i++) {
// 找到归还的连接对象
if(connections[i]==connection){
// 这里无需cas ,应为传过来的Conncetion对象已确定
states.set(i, 0);
//释放许可即可
log.info("已释放该连接{}...",connection);
semaphore.release();
break;
}
}
}
}
acquire原理(获得许可)
3)release原理(释放许可)
4)6.CountdownLatch(倒计时锁)
用来进行线程同步协作,等待所有线程完成倒计时(相当于升级版的join)。
其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() (会阻塞其他线程)用来让计数减一
1)基本使用
@Slf4j
public class T3 {
public static void main(String[] args) throws InterruptedException {
//这里线程数要已知
CountDownLatch latch = new CountDownLatch(3);
new Thread(()->{
log.info("running...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//占有锁数量减1
latch.countDown();
log.info("over...");
},"t1").start();
new Thread(()->{
log.info("running...");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//占有锁数量减1
latch.countDown();
log.info("over...");
},"t2").start();
new Thread(()->{
log.info("running...");
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
//占有锁数量减1
latch.countDown();
log.info("over...");
},"t3").start();
log.info("waiting...");
//判断state是否为0,为0,则所有线程执行了countDown方法,否则还有线程未执行countDown。将已执行的countDown放入阻塞队列,通过CAS自旋再次判断state是否为0
latch.await();
log.info("end...");
}
}
16:43:39.788 [t1] INFO org.lc.juc.T3 - running...
16:43:39.788 [t2] INFO org.lc.juc.T3 - running...
16:43:39.788 [main] INFO org.lc.juc.T3 - waiting...
16:43:39.788 [t3] INFO org.lc.juc.T3 - running...
16:43:40.791 [t1] INFO org.lc.juc.T3 - over...
16:43:41.290 [t3] INFO org.lc.juc.T3 - over...
16:43:44.791 [t2] INFO org.lc.juc.T3 - over...
16:43:44.791 [main] INFO org.lc.juc.T3 - end...
2)线程池配合使用
@Slf4j
public class T3 {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
ExecutorService pool = Executors.newFixedThreadPool(4);
pool.submit(()->{
log.info("running...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//占有锁数量减1
latch.countDown();
log.info("over...");
});
pool.submit(()->{
log.info("running...");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
//占有锁数量减1
latch.countDown();
log.info("over...");
});
pool.submit(()->{
log.info("running...");
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
//占有锁数量减1
latch.countDown();
log.info("over...");
});
pool.submit(()->{
log.info("waiting...");
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("end...");
});
}
16:51:57.747 [pool-1-thread-1] INFO org.lc.juc.T3 - running...
16:51:57.747 [pool-1-thread-3] INFO org.lc.juc.T3 - running...
16:51:57.747 [pool-1-thread-2] INFO org.lc.juc.T3 - running...
16:51:57.747 [pool-1-thread-4] INFO org.lc.juc.T3 - waiting...
16:51:58.250 [pool-1-thread-2] INFO org.lc.juc.T3 - over...
16:51:58.749 [pool-1-thread-1] INFO org.lc.juc.T3 - over...
16:51:59.250 [pool-1-thread-3] INFO org.lc.juc.T3 - over...
16:51:59.250 [pool-1-thread-4] INFO org.lc.juc.T3 - end...
3)游戏加载模拟
public class T4 {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(10);
ExecutorService service = Executors.newFixedThreadPool(10);
Random random=new Random();
String[] progress=new String[10];
System.out.println("游戏加载中请稍后...");
// 新建十个任务
for (int i = 0; i < 10; i++) {
int k=i;
service.submit(()->{
for (int j = 0; j <=100; j++) {
try {
Thread.sleep(random.nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
progress[k]=j+"%";
System.out.print("\r "+ Arrays.toString(progress));
}
//任务执行完之后占有锁减1
countDownLatch.countDown();
});
}
countDownLatch.await();
System.out.println("\n游戏开始...");
service.shutdown();
}
}
游戏加载中请稍后...
[100%, 100%, 100%, 100%, 100%, 100%, 100%, 100%, 100%, 100%]
游戏开始...
7.CyclicBarrier (可复用的倒计时锁)
循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执 行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行
我们平常使用CountdownLatch的时候,当计数减到0时,就不能再对此对象进行重用。
此时我们使用CyclicBarrier 循环栅栏的方式对此计数进行重用
1)基本使用
@Slf4j
public class T5 {
public static void main(String[] args) {
//注意:这里我们的线程数要和栅栏的计数个数一致 才能达到此效果。若为3个,那么执行了两次t1,一次t2
ExecutorService pool = Executors.newFixedThreadPool(2);
CyclicBarrier cyclicBarrier = new CyclicBarrier(2,()->{
log.info("task1 and task2 finsh...");
});
//实现CyclicBarrier的重用
for (int i = 0; i < 2; i++) {
pool.submit(() -> {
log.info("task1 start...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
//相当于CountdownLatch中的CountDown()方法 对信号量减1
// 采用reentrantlock加锁的方式,会阻塞其他线程对此方法的调用
cyclicBarrier.await();
log.info("task1 end...");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
pool.submit(() -> {
log.info("task2 start...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
//相当于CountdownLatch中的CountDown()方法 对信号量减1
cyclicBarrier.await();
log.info("task2 end...");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
}
}
18:05:58.883 [pool-1-thread-1] INFO org.lc.juc.T5 - task1 start...
18:05:58.883 [pool-1-thread-2] INFO org.lc.juc.T5 - task2 start...
18:06:00.885 [pool-1-thread-2] INFO org.lc.juc.T5 - task1 and task2 finsh...
18:06:00.885 [pool-1-thread-2] INFO org.lc.juc.T5 - task2 end...
18:06:00.885 [pool-1-thread-1] INFO org.lc.juc.T5 - task1 end...
18:06:00.885 [pool-1-thread-1] INFO org.lc.juc.T5 - task1 start...
18:06:00.885 [pool-1-thread-2] INFO org.lc.juc.T5 - task2 start...
18:06:02.887 [pool-1-thread-2] INFO org.lc.juc.T5 - task1 and task2 finsh...
18:06:02.887 [pool-1-thread-2] INFO org.lc.juc.T5 - task2 end...
18:06:02.887 [pool-1-thread-1] INFO org.lc.juc.T5 - task1 end...
8.线程安全集合类概述
线程安全集合类可以分为三大类:
- 遗留的线程安全集合如 Hashtable , Vector
- 使用 Collections 装饰的线程安全集合,如:
- Collections.synchronizedCollection
- Collections.synchronizedList
- Collections.synchronizedMap
- Collections.synchronizedSet
- Collections.synchronizedNavigableMap
- Collections.synchronizedNavigableSet
- Collections.synchronizedSortedMap
- Collections.synchronizedSortedSet
- java.util.concurrent.*
重点介绍 java.util.concurrent.* 下的线程安全集合类,可以发现它们有规律,里面包含三类关键词: Blocking、CopyOnWrite、Concurrent
- Blocking 大部分实现基于锁,并提供用来阻塞的方法
- CopyOnWrite 之类容器修改开销相对较重
- Concurrent 类型的容器
- 内部很多操作使用 cas 优化,一般可以提供较高吞吐量
- 弱一致性
- 遍历时弱一致性,例如,当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍 历,这时内容是旧的
- 求大小弱一致性,size 操作未必是 100% 准确
- 读取弱一致性
遍历时如果发生了修改,对于非安全容器来讲,使用 fail-fast 机制也就是让遍历立刻失败,抛出 ConcurrentModificationException,不再继续遍历
9.ConcurrentHashMap
我们对单词数进行统计:
生成测试数据: 模拟26个字母,每个字母200个,随机平均分配在26个.txt的文件中。每一行代表一个单词
static final String ALPHA = "abcedfghijklmnopqrstuvwxyz";
private static void generateWord() {
int length = ALPHA.length();
int count = 200;
List<String> list = new ArrayList<>(length * count);
for (int i = 0; i < length; i++) {
char ch = ALPHA.charAt(i);
for (int j = 0; j < count; j++) {
list.add(String.valueOf(ch));
}
}
Collections.shuffle(list);
for (int i = 0; i < 26; i++) {
try (PrintWriter out = new PrintWriter(new OutputStreamWriter(new FileOutputStream( (i + 1) + ".txt")))) {
String collect = list.subList(i * count, (i + 1) * count).stream().collect(Collectors.joining("\n"));
out.print(collect);
} catch (IOException e) {
}
}
}
读取模板代码
private static <V> void demo(Supplier<Map<String, V>> supplier, BiConsumer<Map<String, V>, List<String>> consumer) {
Map<String, V> counterMap = supplier.get();
List<Thread> ts = new ArrayList<>();
for (int i = 1; i <= 26; i++) {
int idx = i;
Thread thread = new Thread(() -> {
List<String> words = readFromFile(idx);
consumer.accept(counterMap, words);
});
ts.add(thread);
}
ts.forEach(t -> t.start());
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(counterMap);
}
public static List<String> readFromFile(int i) {
ArrayList<String> words = new ArrayList<>();
try (BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream( i + ".txt")))) {
while (true) {
String word = in.readLine();
if (word == null) {
break;
}
words.add(word);
}
return words;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
测试介绍:
- 一是提供一个 map 集合,用来存放每个单词的计数结果,key 为单词,value 为计数
- 二是提供一组操作,保证计数的安全性,会传递 map 集合以及 单词 List
正确结果输出应该是每个单词出现 200 次
{a=200, b=200, c=200, d=200, e=200, f=200, g=200, h=200, i=200, j=200, k=200, l=200, m=200, n=200, o=200, p=200, q=200, r=200, s=200, t=200, u=200, v=200, w=200, x=200, y=200, z=200}
1)HashMap测试
public static void main(String[] args) {
demo(
//存储单词的集合类型
//new HashMap<String,Integer>()
()->new ConcurrentHashMap<String,Integer>(),
//对单词进行计数
(map,words)->{
for (String word:words){
Integer counter = map.get(word);
int newValue = counter == null ? 1 : counter + 1;
map.put(word, newValue);
}
}
);
}
{a=195, b=199, c=195, d=196, e=190, f=195, g=199, h=181, i=199, j=197, k=196, l=195, m=194, n=197, o=199, p=174, q=198, r=198, s=195, t=194, u=192, v=195, w=200, x=197, y=194, z=197}
我们发现,HashMap并非线程安全, 会存在指令交错的情况。ConcurrentHashMap虽然是线程安全的,但是对单词进行计数的组合方法并不是线程安全的,不能保证原子性
2)使用computeIfAbsent保证原子性
public static void main(String[] args) {
demo(
//使用LongAdder作为值的累加操作类型
()->new ConcurrentHashMap<String,LongAdder>(),
//对单词进行计数
(map,words)->{
for (String word:words){
//new LongAdder() 默认为0
//如果缺少一个key,则计算生成一个value,然后将key value放入map
LongAdder longAdder = map.computeIfAbsent(word, (key) -> new LongAdder());
//如果该key存在,那么返回该key对应的value,则不会执行生成value操作(`(key) -> new LongAdder()`)
longAdder.increment();
// 保证我们这几个操作的原子性
}
}
);
}
{a=200, b=200, c=200, d=200, e=200, f=200, g=200, h=200, i=200, j=200, k=200, l=200, m=200, n=200, o=200, p=200, q=200, r=200, s=200, t=200, u=200, v=200, w=200, x=200, y=200, z=200}