# 多线程
# 目录
# 锁
由于多个线程对同一个对象进行写操作,其必然出现资源争抢,引发线程安全问题,而锁就是为了解决这种问题而诞生的,java中如下几种锁
- 自旋锁:为了不放弃CPU执行事件,循环使用
CAS技术对数据更新,直至成功,使用while(true)方式循环CAS操作为自旋锁的典型实现- 悲观锁:在执行前假定会发生冲突,所以从读取数据开始就对行为上锁,参考
synchronized关键字- 乐观锁:提出了版本的概念,在读取数据时,会将当前数据版本号一并读出来,每次写时候会对版本进行修改,其他线程写的时候,会先判断当前版本号和读取时是否一致,一致才会进行写入
- 独享锁(写):给资源加上写锁,线程可以修改资源,其他线程不能再加锁,
synchronized也属于这种- 共享锁(读):给资源加上读锁,其他线程也只能加读锁,不能加写锁
- 可重入锁、不可重入锁:线程拿到一把锁后,可以自由进入同一把锁所同步的其他代码,
synchronized属于可重入锁- 公平锁、非公平锁:争抢锁的顺序,按照先来后到的顺序
Java中有几种重要的锁实现方式:synchronized,ReentrantLock,ReentrantReadWriteLock
# synchronized(同步关键字)
这属于java中最基本的线程通信机制,基于对象监视器实现,上面介绍过属于可重入,独享,悲观锁类型,在使用时,可用于方法(静态/非静态),同步代码块,同时由于Happens-Before规则的存在,synchronized也可以用来保证可见性
锁失效问题
synchronized在非静态方法上,监视器所监视的对象为当前对象this,因此如果多个线程是通过不同对象调用该方法时,同步作用失效。静态方法由于是监视类对象,因此不存在该问题
在synchronized关键字中,有一种场景为锁粗化,意思为,如果一段代码被频繁调用执行,而这段代码中有两个以上的synchronized并且对象锁是同一个时,在JIT判断是否属于热点代码,若是,则会将两个锁合并为一个使用。
例如下代码
Java代码执行过程
- 源代码经javac编译成字节码,class文件
- 程序字节码经过
JIT环境变量进行判断,是否属于“热点代码”(多次调用的方法,或循环等),若是,走JIT编译为具体硬件处理器(如sparc、intel)机器码,若不是,则直接由解释器解释执行 - 操作系统及类库调用
public void add() {
synchronized (this) {
i++;
}
synchronized (this) {
i++;
}
}
如果JIT判断出以上代码在项目中存在多次调用,会自动将以上代码优化为
synchronized (this) {
i++;
i++;
}
还有一种情况为锁消除,同样属于JIT优化后的结果,我们都知道StringBuffer是线程安全的,其append方式就是使用了synchronized修饰
public void append() {
StringBuffer stringBuffer = new StringBuffer();
stringBuffer.append("1");
stringBuffer.append("2");
stringBuffer.append("3");
}
由于这段代码使用的是局部变量,不会存在线程安全问题,因此JIT会在编译期将其内部的synchronized关键字消除
synchronized实际会修改对象头中的一个标志位来表明当前对象状态,其加锁过程会经历 偏向锁-->轻量级锁(CAS操作修改标志位)-->重量级锁(Monitor监视器),偏向锁可以在启动时通过参数关闭
偏向锁,仅仅是第一次有用(实际就是无锁,因为没有出现资源争抢),只要当前对象出现过争抢资源时,就会变成轻量级锁。
轻量级锁,通过不断的自旋判断当前对象是否有锁,若没锁,则绑定当前线程(加锁),如果有锁就会不断的自旋进行CAS,但自旋会消耗过多的性能,达到一定次数后,该对象会升级成重量级锁,进入阻塞
# ReentrantLock(可重入锁)
ReentrantLock是Lock接口的一个实现类,具有独享,可重入特性,并且支持公平锁、非公平锁两种模式
在使用时,通过lock.lock()加锁,通过lock.unlock()解锁。
值得注意的是,在加锁的同时,会修改锁内的一个计数标志位,记录当前获取锁的次数,如果执行了两次lock.lock(),则对应的计数为2,且在释放锁时,也必须进行两次lock.unlock()解锁,否则其他线程依旧拿不到这把锁
另外,如果线程在获取锁的时候被阻塞住了,并且我们想手动中断这个被阻塞的线程,则必须使用lock.lockInterruptibly()来争抢锁,否则无法中断线程
ReentrantLock内部通过调用sync.lock()方法实现,Sync继承自AbstractQueuedSynchronizer(抽象队列同步器简称AQS)类,该类体现了设计模式中的模板方法,对锁的关键步骤进行了抽象,比如加锁,解锁,将线程放入链表中,其中尝试加锁,尝试解锁由具体的子类复写实现
在JDK提供多种锁的实现中,均用到了这个类,先来看一下Sync的子类NonfairSync(非公平锁)对lock方法的实现
final void lock() {
// 这里先尝试进行CAS操作修改锁的标志位
// 如果修改成功,则将当前线程赋予exclusiveOwnerThread属性,记录当前哪个线程获得了这把锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 修改失败,为了保证可重入性,会尝试获取锁
acquire(1);
}
在acquire方法中做了两次尝试,其方法分别是调用复写的tryAcquire()方法和将当前线程放入链表并阻塞的方法acquireQueued()
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
在tryAcquire()方法中调用了nonfairTryAcquire()方法,这个方法保证了可重入性
final boolean nonfairTryAcquire(int acquires) {
// 先获取当前线程
final Thread current = Thread.currentThread();
int c = getState();
// 如果当前线程无锁,则尝试CAS操作修改标志位,成功则保存当前线程为锁持有者
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果这把锁已经有持有者了,则判断当前线程是否是锁的持有者。
// 如果是,则累加标志位。
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 以上判断都不是则获取锁失败
return false;
}
我们可以看到,在重入锁的时候会对锁的标志位进行累加,这也验证了lock()和unlock()必须成对出现,否则会进入死循环
执行完该方法后,如果没有获取到锁,则调用acquireQueued()尝试将当前线程放入队列中并阻塞等待
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 自旋判断链表中的前一节点是否为头节点,如果是,则再尝试获取一次锁
// 如果获取到了则返回
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果依旧没获取到则将线程放入当前节点并阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这里通过LockSupport.park(this)方式阻塞,具体代码不展示了,下面看一下解锁unlock()方法是如何实现的
在代码中可以看到,同样调用了AQS类中的方法,只不过解锁改为了release()方法,在该方法中调用了子类复写的tryRelease()方法
protected final boolean tryRelease(int releases) {
// 先计算释放后的标志位
int c = getState() - releases;
// 判断解锁的进程是否是锁持有者
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果标志位为0,说明没有任何一个线程持有这把锁了,因此将持有者属性设为null
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 如果不是0,说明这个锁有重入的线程,因此仅仅减少一层锁,但该锁的线程持有者依旧不变
setState(c);
return free;
}
由上可知,当锁完全释放后,该方法返回一个true,程序返回到release()方法中继续执行,去通知后面的线程唤醒并执行
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 当锁释放后,判断链表中是否还有其他线程,如果有则通过unparkh唤醒下一个节点的线程
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
Condition是ReentrantLock中的一个功能点,其作用是配合Lock使用可以达到wait/notify相同的作用,并且可以更加精确地控制唤醒某个具体线程(底层是park/unPark机制)。
假设现在有个容器,并且有两个线程,分别是生产者线程和消费者线程,当容器里没东西的时候消费者线程则进入阻塞等待,当生产者生产完东西时,唤醒消费者线程进行消费,反之当容器满的时候生产者阻塞,当有空余位置时,消费者唤醒生产者去继续生产
private final static Lock lock = new ReentrantLock();
// 生产者对应的条件
private final static Condition producerCondition = lock.newCondition();
// 消费者对应的条件
private final static Condition consumerCondition = lock.newCondition();
// 消费者如果发现容器空了,则进入对应的条件阻塞
consumerCondition.await();
// 生产者生产完东西放入容器后,唤醒消费者消费
consumerCondition.signal();
// 生产者发现容器满了,则进入对应的条件阻塞
producerCondition.await();
// 消费者消费完,通知生产者继续生产
producerCondition.signal();
注意await()会释放锁
# ReadWriteLock(读写锁)
ReadWriteLock是一个读写锁的接口,其典型实现为ReentrantReadWriteLock,读写锁实际是维护一对关联锁,一个只用于读操作,一个用于写操作,读锁是一个共享锁,可以由多个线程同时持有,而写锁是排他锁
当所有的读操作完成时,才会进行加写锁操作。适用于读取比写入多的场景,例如:缓存
在读写锁中有一个锁降级的概念,指把写锁降级为读锁。把持住当前拥有的写锁同时,再获取到读锁,随后释放写锁的过程。
我们可以看到在ReentrantReadWriteLock中,同时维护了读锁和写锁
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
# Semaphore(信号量)
Semaphore多用来接口限流,通过给线程发放凭证来保证同一资源被访问次数,其实就是一种共享锁的一种体现,使用起来很简单
Semaphore semaphore = new Semaphore(5);
semaphore.acquire();
// todo: do something
semaphore.release();
整体来说先获取令牌 --> 执行逻辑 --> 释放令牌,其中实现也和AQS类相关,先来看一下获取令牌的操作原理
在调用acquire()方法时,实际就是调用了Sync类的acquireSharedInterruptibly(1)方法,上面我们写过,Sync继承自AbstractQueuedSynchronizer类
在acquireSharedInterruptibly方法中,开始尝试获取信号量
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取信号量,如果信号量不足时,尝试将当前线程阻塞住
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
通过调用子类复写的tryAcquireShared方法,最终调用nonfairTryAcquireShared方法,在该方法中先计算出来如果发放令牌后,信号量的剩余量
如果小于0,则不进行CAS操作,并返回
final int nonfairTryAcquireShared(int acquires) {
// 自旋判断
for (;;) {
// 先获取当前剩余信号量
int available = getState();
// 计算发放后的剩余信号量
int remaining = available - acquires;
// 当信号量不足或CAS操作成功时返回剩余信号量
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
代码返回acquireSharedInterruptibly方法继续执行,如果返回的剩余信号量小于零,则说明无信号量可用,则调用doAcquireSharedInterruptibly方法将当前线程阻塞住
在这个方法中阻塞逻辑和上面独享锁的acquireQueued()方法逻辑相似,依旧是先判断上一个节点是否是头节点,如果是则重新尝试获取信号量,否则将当前线程加入链表中阻塞,只不过链表的节点属性变为了Node.SHARED(共享锁)
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 获取上一个节点
final Node p = node.predecessor();
// 判断是否是头节点
if (p == head) {
// 重新尝试获取信号量
int r = tryAcquireShared(arg);
if (r >= 0) {
// 获取到后,将节点设置为头节点,并返回
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 如果不是头节点或者没获取到信号量,则调用下面方法进行阻塞线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
在这个方法中阻塞线程依旧采用的是pack/unpack机制,下面看一下,释放信号量是如何实现的
public final boolean releaseShared(int arg) {
// 先尝试释放,如果成功则唤醒后面阻塞的线程重新获取信号量
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
其中在tryReleaseShared()方法是需要在子类中进行复写,具体实现逻辑和之前的ReentrantLock相似,依旧是通过判断标志位,进行CAS操作来实现的
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 获取当前信号量
int current = getState();
// 释放后的信号量
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// CAS操作成功后返回
if (compareAndSetState(current, next))
return true;
}
}
该方法执行完后返回,releaseShared方法调用doReleaseShared来唤醒后面的线程
# CountDownLatch(计数器)
AQS另一个重要应用就是在CountDownLatch中
应用场景为:主线程开启了多个线程执行任务,并且主线程需要等所有子线程执行完任务后在,再继续执行后面的逻辑,可以使用如下代码
CountDownLatch countDownLatch = new CountDownLatch(5);
// todo: 通知子线程执行任务
// 主线程 调用await方法进行阻塞,当count为0时,自行唤醒主线程
countDownLatch.await();
// 各个子线程执行完后调用countDown()方法进行减一
countDownLatch.countDown();
其实现原理与信号量相同,都是通过调用AQS中的acquireSharedInterruptibly / releaseShared方法,进行阻塞和释放
通过CountDownLatch复写的tryAcquireShared方法,来判断当前计数是否为零
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
不是零的话返回-1 然后在acquireSharedInterruptibly方法中判断tryAcquireShared返回值,如果小于零则阻塞当前线程
在子线程在调用countDown()时,实际做了一次释放资源的操作,这个操作同样通过一层层调用,最终调用到CountDownLatch复写的tryReleaseShared方法
protected boolean tryReleaseShared(int releases) {
// 循环判断
for (;;) {
int c = getState();
// 如果为0后,返回false,可以理解为抛异常
if (c == 0)
return false;
int nextc = c-1;
// 通过CAS修改状态位,如果是0返回true,用于后续判断唤醒线程使用
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
执行完以上方法后调用doReleaseShared唤醒主线程
# CyclicBarrier(栅栏)
这个应用场景和CountDownLatch相反,通常用来多个线程并发执行
比如想要批量执行SQL,一次性执行5条SQL,那么前四个线程会将SQL缓存起来,并阻塞,当第五个线程来的时候,同时唤醒前四个线程,并执行,可以看下面的例子
public static void main(String[] args) {
// 当线程满了后,执行的回调逻辑
Runnable action = new Runnable() {
@Override
public void run() {
// todo: 批量执行SQL
System.out.println("批量执行了SQL");
}
};
CyclicBarrier cyclicBarrier = new CyclicBarrier(5,action);
Runnable task = new Runnable() {
@Override
public void run() {
try {
// todo:将SQL缓存起来
Thread.sleep(1000);
System.out.println("生成SQL完成");
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < 5; i++) {
Thread thread = new Thread(task);
thread.start();
}
}
下面看下其原理实现,在调用await()方法后会在其内部调用一个dowait()方法,该方法是CyclicBarrier的核心逻辑,我们可以分为两部份来看
首先看线程满了之后,执行回调的部分,在该方法内部,会先判断当前还可以阻塞的线程数,如果为0时,则开始调用CyclicBarrier初始化时,传入的Runnable
int index = --count;
// 判断是否还有阻塞线程的名额
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
// 如果线程已满,则开始调用Runnable
if (command != null)
command.run();
ranAction = true;
// Runnable执行完成后,唤醒当前批次阻塞的所有线程,并开始下一轮的初始化复赋值(count重新赋值)
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
唤醒当前批次所阻塞的线程操作在nextGeneration方法中,在这个方法中不但唤醒了线程,并且对下一轮的属性进行了赋值
private void nextGeneration() {
// 唤醒本轮所有线程
trip.signalAll();
// 开启下一轮
count = parties;
generation = new Generation();
}
在CyclicBarrier这个类中,我们可以看到trip其实就是一个Condition
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
而signalAll这个接口方法是由AbstractQueuedSynchronizer的内部类ConditionObject实现的。
同样阻塞的线程被保存到一个链表中,通过遍历链表来唤醒线程,这里的唤醒线程同样通过LockSupport.unpark()方式实现
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
以上就是CyclicBarrier控制并发执行的主流程,而对于线程未满的情况,将线程阻塞住的流程,就相对简单了
for (;;) {
try {
if (!timed)
// 如果未指定超时时间,则使用Condition接口的await()方法将线程阻塞住
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// todo: do something
}
} finally {
lock.unlock();
}
# 线程通信
Java中线程之间的通信一般有stop、suspend / resume、wait / notify、park / unpark
# Stop(强制终止—被弃用)
stop()是过时的,不建议使用stop()是一种强制中断,并不关心当前线程状态和代码执行逻辑,一但调用,则立即终止,如果在有锁的情况下,也不会释放锁,造成死锁stop()会破坏代码的原子性逻辑,因为调用时,我们并不清楚线程执行到那行逻辑,有可能会造成数据不一致
# suspend和resume(被弃用)
线程通过调用suspend()方法阻塞,通过调用resume()唤醒
需要注意的是这种方式也容易造成死锁,造成死锁的情况有两种
- 如果在同步代码中调用
suspend()/resume()方法,线程在阻塞时并不会释放对象锁,导致无法执行resume() - 如果调用
resume()在suspend()方法之前,同样会造成死锁
例如以下两种代码,均会造成死锁
public void suspendAndResume() throws InterruptedException {
Object o = new Object();
Thread thread = new Thread(() -> {
int i = 0;
synchronized (o) {
System.out.println("开始休眠");
Thread.currentThread().suspend();
}
});
thread.start();
Thread.sleep(2000);
System.out.println("即将唤醒子线程");
// 由于suspend方法不会释放锁,因此代码到这里就会死锁
synchronized (o) {
thread.resume();
}
}
针对这种死锁的情况,推荐使用wait / notify 方式,这种方式会在阻塞时自动释放锁
第二种死锁情况,是由于在调用suspend()方法前已经调用了resume()方法
public void suspendAndResume() throws InterruptedException {
Object o = new Object();
Thread thread = new Thread(() -> {
try {
System.out.println("开始休眠");
Thread.sleep(3000);
System.out.println("准备调用suspend()");
Thread.currentThread().suspend();
System.out.println("唤醒了");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread.start();
Thread.sleep(1000);
System.out.println("准备调用resume()");
// 这里由于有限调用了resume(),因此当线程一旦调用suspend()方法后,在本代码中就无法唤醒了
thread.resume();
}
针对由于这种方式引起的死锁,推荐使用park \ unpark机制阻塞线程
# wait和notify
wait和notify方式是基于对象监视器来进行线程阻塞的,在调用wait()方法后,会将线程放入该对象的等待队列中,并且会主动放弃资源(锁)。当调用notify()方法时,会将一个线程从等待队列中拿出来,重新进行资源的争抢
由于wait和notify会对对象头中的锁标志位进行修改,因此他们必须在同步代码块中使用。
wait和sleep的区别
- 所属类不同
wait是来自Object类,而sleep来自Thread类 wait是基于对象监视器实现的,且必须在同步代码块中使用,会自动释放锁,而sleep可以在任意处使用,但不会释放锁
值得注意的是,虽然wait()方法会自动释放锁,但对代码的执行顺序还是有要求的,notify()方法必须在wait()后执行,否则会造成死锁,下面是死锁示例
public void waitAndNotify() throws InterruptedException {
// 这种方式对唤醒的调用顺序有要求
Thread thread = new Thread(() -> {
int i = 0;
System.out.println("线程开始执行run方法");
try {
Thread.sleep(3000);
synchronized (this) {
System.out.println("子线程开始休眠");
// 由于主线程先执行了notify()方法,因此在这里会造成死锁
this.wait();
System.out.println("子线程被唤醒");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread.start();
Thread.sleep(1000);
synchronized (this) {
this.notify();
System.out.println("主线程唤醒子线程");
}
}
伪唤醒
代码中不应该用if判断是否应该进入等待状态,原因是处于等待状态的线程可能会因为CPU、操作系统调度等底层原因造成伪唤醒。
因此官方建议在循环中检查等待条件,例如
while(判断等待条件) { // todo: 线程等待 }
上面的代码展示了wait和notify方式由于调用顺序问题造成死锁的情况,为了弥补这种问题,JDK提供了另外一种机制park \ unpark
# park和unpark
← 监听器、过滤器、拦截器 集合 →