今天呢!猿塔君跟大家讲:
P8架构师带你深入-Java高并发底层原理
2. Synchronized锁底层原理
2.1 synchronized原理
查看字节码:javap -c SyncDemo.class3

今晚会有我们在职一线大咖免费做Java进阶技术分享
大家有时间的可以来听听哦!
主题:【微服务架构设计之Dubbo内核剖析】
讲师:Roc / 资深架构师
课程收获:
1、掌握RPC核心原理
2、理解Socket通信机制
3、Dubbo底层架构剖析
4、基于Zookeeper实现服务上下线感知
5、基于Netty实现Dubbo服务调用

lock对象、线程A和线程B三者是一种什么关系?
1、lock对象维护了一个等待队列list;
2、线程A中执行lock的wait方法,把线程A保存到list中;
3、线程B中执行lock的notify方法,从等待队列中取出线程A继续执行;当然了,Hotspot实现不可能这么简单。

关于主内存与工作内存之间的具体交互协议,即一个变量如何从主内存拷贝到工作内存、如何从工作内存同步到主内存之间的实现细节,Java内存模型定义了以下八种操作来完成:
lock(锁定):作用于主内存的变量,把一个变量标识为一条线程独占状态。
unlock(解锁):作用于主内存变量,把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定。
read(读取):作用于主内存变量,把一个变量值从主内存传输到线程的工作内存中,以便随后的load动作使用。
load(载入):作用于工作内存的变量,它把read操作从主内存中得到的变量值放入工作内存的变量副本中。
use(使用):作用于工作内存的变量,把工作内存中的一个变量值传递给执行引擎,每当虚拟机遇到一个需要使用变量的值的字节码指令时将会执行这个操作。
assign(赋值):作用于工作内存的变量,它把一个从执行引擎接收到的值赋值给工作内存的变量,每当虚拟机遇到一个给变量赋值的字节码指令时执行这个操作。
store(存储):作用于工作内存的变量,把工作内存中的一个变量的值传送到主内存中,以便随后的write的操作。
-
write(写入):作用于主内存的变量,它把store操作从工作内存中一个变量的值传送到主内存的变量中。
2.2 wait和notify
Object对象的wait()和notify():
总结:wait释放锁,notfiy不释放锁。
2.3 Java对象结构
对象的几个部分的作用:
3.数组长度也是占用64位(8字节)的空间,这是可选的,只有当本对象是一个数组对象时才会有这个部分;
4.对象体是用于保存对象属性和值的主体部分,占用内存空间取决于对象的属性数量和类型;
5.对齐字是为了减少堆内存的碎片空间(不一定准确)。
2.4 Mark Word了解了对象的总体结构
接下来深入地了解对象头的三个部分。

lock:2位的锁状态标记位,由于希望用尽可能少的二进制位表示尽可能多的信息,所以设置了lock标记。该标记的值不同,整个Mark Word表示的含义不同。biased_lock和lock一起,表达的锁状态含义如下:
biased_lock:对象是否启用偏向锁标记,只占1个二进制位。为1时表示对象启用偏向锁,为0时表示对象没有偏向锁。lock和biased_lock共同表示对象处于什么锁状态。
1.初期锁对象刚创建时,还没有任何线程来竞争,对象的Mark Word是下图的第一种情形,这偏向锁标识位是0,锁状态01,说明该对象处于无锁状态(无线程竞争它)。 2.当有一个线程来竞争锁时,先用偏向锁,表示锁对象偏爱这个线程,这个线程要执行这个锁关联的任何代 码,不需要再做任何检查和切换,这种竞争不激烈的情况下,效率非常高。这时Mark Word会记录自己偏爱的线程的ID,把该线程当做自己的熟人。如下图第二种情形。 3.当有两个线程开始竞争这个锁对象,情况发生变化了,不再是偏向(独占)锁了,锁会升级为轻量级锁,两个线程公平竞争,哪个线程先占有锁对象并执行代码,锁对象的Mark Word就执行哪个线程的栈帧中的锁记录。如下图第三种情形。 4.如果竞争的这个锁对象的线程更多,导致了更多的切换和等待,JVM会把该锁对象的锁升级为重量级锁,这个就叫做同步锁,这个锁对象Mark Word再次发生变化,会指向一个监视器对象,这个监视器对象用集合的形式,来登记和管理排队的线程。如下图第四种情形。
这一部分用于存储对象的类型指针,该指针指向它的类元数据,JVM通过这个指针确定对象是哪个类的实例。该指针的位长度为JVM的一个字大小,即32位的JVM为32位,64位的JVM为64位。如果应用的对象过多,使用64位的指针将浪费大量内存,统计而言,64位的JVM将会比32位的JVM多耗费50%的内存。为了节约内存可以使用选项+UseCompressedOops开启指针压缩,其中,oop即ordinary object pointer普通对象指针。开启该选项后,下列指针将压缩至32位:
如果对象是一个数组,那么对象头还需要有额外的空间用于存储数组的长度,这部分数据的长度也随着JVM架构的不同而不同:32位的JVM上,长度为32位;64位JVM则为64位。64位JVM如果开启+UseCompressedOops选项,该区域长度也将由64位压缩至32位。

2.5 乐观锁和悲观锁
乐观锁(读多写少,CAS) 乐观锁(Optimistic Lock),顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在提交更新的时候会判断一下在此期间别人有没有去更新这个数据。乐观锁适用于读多写少的应用场景,这样可以提高吞吐量。乐观锁:假设不会发生并发冲突,
缺点:可能会出现的ABA(上图右)的问题,不一定会影响结果,解决办法是提供额外的标志位或者时间戳(类似于mysql version字段)。
悲观锁(读少写多、synchronized) 悲观锁(Pessimistic Lock),顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。悲观锁:假定会发生并发冲突,屏蔽一切可能违反数据完整性的操作。Javasynchronized 就属于悲观锁的一种实现,每次线程要修改数据时都先获得锁,保证同一时刻只有一个线程能操作数据,其他线程则会被block。

*锁优化机制:*
偏向锁:先使用偏向锁
轻量锁:获取偏向锁失败则升级为CAS轻量锁
自旋锁:获取轻量锁失败,则进行短暂的自旋
重量级锁:以上都失败则升级为重量级锁
*偏向锁:* (程序没有竞争,则取消同步操作)
*轻量级锁*(偏向锁运行在一个线程进入同步块的情况下,当第二个线程加入锁竞争用的时候,偏向锁就会升级为CAS轻量级锁)
当一个处于运行状态的线程调用了suspend()方法以后,它就会进入挂起状态(这一方法已经过时不建议使用)。挂起状态的线程也没有释放对象锁,它需要调用resume()方法以后才能恢复到可运行状态。
将线程挂起容易导致程序死锁。
AQS(AbstractQueuedSynchronizer)是一个抽象同步队列,JUC(java.util.concurrent)中很多同步锁都是基于AQS实现的。
AQS的基本原理就是当一个线程请求共享资源的时候会判断是否能够成功操作这个共享资源,如果可以就会把这个共享资源设置为锁定状态,如果当前共享资源已经被锁定了,那就把这个请求的线程阻塞住,也就是放到队列中等待。

AQS 中有一个被 volatile 声明的变量用来表示同步状态
提供了 getState() 、 setState() 和 compareAndSetState() 方法来修改 state 状态的值
// 返回同步状态的当前值
protected final int getState() {
return state;
}
// 设置同步状态的值
protected final void setState(int newState) {
state = newState;
}
// CAS操作修改state的值
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
上面说了 AQS 是 JUC 中很多同步锁的底层实现,锁也分很多种,有像 ReentrantLock 这样的独占锁,也有 ReentrantReadWriteLock 这样的共享锁,所以 AQS 中也必然是包含这两种操作方式的逻辑
1. 独占式
获取资源的时候会调用 acquire() 方法,这里面会调用 tryAcquire() 方法去设置 state 变量,如果失败的话就把当前线程放入一个 Node 中存入队列
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
2. 共享式
获取资源会调用 acquireShared() 方法,会调用 tryAcquireShared() 操作 state 变量,如果成功就获取资源,失败则放入队列
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
和独占式一样, tryAcquireShared() 和 tryReleaseShared() 也是需要子类来提供
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
3.4 条件变量Condition
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
lock.lock();
System.out.println(" t1 加锁");
System.out.println("t1 start await");
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("t1 end await");
lock.unlock();
}
});
thread1.start();
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
lock.lock();
System.out.println(" t2 加锁");
System.out.println("t2 start signal");
condition.signal();
System.out.println("t2 end signal");
lock.unlock();
}
});
thread2.start();
}
Causes the current thread to wait until another thread invokes the notify() method for this object.
CountDownLatch.await()调用,最后跟到了LockSupport.park()方法里, 这里调用的是 unsafe.park()方法来block线程。
// Hotspot implementation via intrinsics API
private static final Unsafe unsafe = Unsafe.getUnsafe();
//unsafe 用来实现底层操作private static final long parkBlockerOffset; //辅助参数,配合unsafe用的
//This object is recorded while // the thread is blocked to permit monitoring and diagnostic tools
to // identify the reasons that threads are blocked. //设置一个线程和关联的blocker对象,blocker用
来做分析,debug的private static void setBlocker(Thread t, Object arg) { // Even though volatile,hotspot doesn’t need a write barrier here. unsafe.putObject(t, parkBlockerOffset, arg); }
//block当前线程,是否真的block了取决于permit是否available
//permit相当于1,0的开关, 默认是0,调一次unpark就+1变成1了,调一次park会消费这个1又变成0了(park立即返回),
//再次调用park会变成block(因为没有1可以拿了,会等在这,直到有1),这时调用unpark会把1给回去(线程解锁返回)
//每个线程都有个相关的permit, permit最多一个,调用unpark多次也不会积累
//当为permit available时,方法会立即返回,不会block,反之就会block当前线程直到下面3件事发生
//1. 其他线程调用了unpark(此线程)
//2. 其他线程interrupts了此线程
//3. The call spuriously (that is, for no reason)returns. public static void park() { unsafe.park(false, 0L); }
//对于给定线程,让permit变得avaliable, public static void unpark(Thread thread) { if (thread !=null) unsafe.unpark(thread); }
//然后park有2个带限定时间的版本,所以一共有3个park version, 这3个version又有带blocker的debug版本public static void parkNanos(long nanos) { public static void parkUntil(long deadline) {
System.out.println("start");
LockSupport.parkNanos(1000000000);
System.out.println("end");//一开始会block线程,直到给定时间过去后才往下走
System.out.println(“start”); LockSupport.unpark(Thread.currentThread());
LockSupport.parkNanos(1000000000); System.out.println(“end”); //不会block,因为一开始给了一个permitSystem.out.println(“start”); LockSupport.unpark(Thread.currentThread());
LockSupport.unpark(Thread.currentThread()); LockSupport.parkNanos(1000000000);
System.out.println(“inter”); LockSupport.parkNanos(1000000000); System.out.println(“end”);
//第一个park不会block,第2个会,因为permit不会因为多次调用unpark就积累
4. ReentrantLock源码分析
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// c=0 说明没有其他线程占有锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 队列中没有其他线程在等待锁,而且CAS把state设置成入参的值成功,这里是1(这里的CAS就是我
// 们前文提的并发竞争机制),则当前线程获取锁成功并将owner线程设置为当前线程
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 可重入设置,当前线程重复请求锁成功,只是增加请求锁的计数
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 上面这个官方注释很直白,其实下面的enq方法里也执行了这段代码,但是这里先直接试一下看能
// 否插入成功
Node pred = tail;
if (pred != null) {
node.prev = pred;
// CAS把tail设置成当前节点,如果成功的话就说明插入成功,直接返回node,失败说明有其他线程也
// 在尝试插入而且其他线程成功,如果是这样就继续执行enq方法
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
继续看enq方法:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 最开始head和tail都是空的,需要通过CAS做初始化,如果CAS失败,则循环重新检查tail
if (compareAndSetHead(new Node()))
tail = head;
} else {
// head和tail不是空的,说明已经完成初始化,和addWaiter方法的上半段一样,CAS修改
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
/*
* 如果前置节点是head,说明当前节点是队列第一个等待的节点,这时去尝试获取锁,如果成功了则
* 获取锁成功。这里有的同学可能没看懂,不是刚尝试失败并插入队列了吗,咋又尝试获取锁? 其实这*
* 里是个循环,其他刚被唤醒的线程也会执行到这个代码
*/
if (p == head && tryAcquire(arg)) {
// 队首且获取锁成功,把当前节点设置成head,下一个节点成了等待队列的队首
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
/* shouldParkAfterFailedAcquire方法判断如果获取锁失败是否需要阻塞,如果需要的话就执行
* parkAndCheckInterrupt方法,如果不需要就继续循环
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
下面看一下shouldParkAfterFailedAcquire方法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取pred前置节点的等待状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
/* 前置节点状态是signal,那当前节点可以安全阻塞,因为前置节点承诺执行完之后会通知唤醒当前
* 节点
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
// 前置节点如果已经被取消了,则一直往前遍历直到前置节点不是取消状态,与此同时会修改链表关系
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 前置节点是0或者propagate状态,这里通过CAS把前置节点状态改成signal
// 这里不返回true让当前节点阻塞,而是返回false,目的是让调用者再check一下当前线程是否能
// 成功获取锁,失败的话再阻塞,这里说实话我也不是特别理解这么做的原因
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
假设前面一步返回true需要阻塞,则会调用parkAndCheckInterrupt进行阻塞
private final boolean parkAndCheckInterrupt() {
// 阻塞当前线程,监事是当前sync对象
LockSupport.park(this);
// 阻塞返回后,返回当前线程是否被中断
return Thread.interrupted();
}
park方法
public static void park(Object blocker) {
Thread t = Thread.currentThread();
// 设置当前线程的监视器blocker
setBlocker(t, blocker);
// 这里调用了native方法到JVM级别的阻塞机制阻塞当前线程
UNSAFE.park(false, 0L);
// 阻塞结束后把blocker置空
setBlocker(t, null);
}
至此,一次lock的调用就完成了,总结来说:
调用tryAcquire方法尝试获取锁,获取成功的话修改state并直接返回true,获取失败的话把当前线程加到等待队列中。
加到等待队列之后先检查前置节点状态是否是signal,如果是的话直接阻塞当前线程等待唤醒,如果不是的话判断是否是cancel状态,是cancel状态就往前遍历并把cancel状态的节点从队列中删除。如果状态是0或者propagate的话将其修改成signal。
-
阻塞被唤醒之后如果是队首并且尝试获取锁成功就返回true,否则就继续执行前一步的代码进入阻塞。
public final boolean release(int arg) {
/*
尝试释放锁如果失败,直接返回失败,如果成功并且head的状态不等于0就唤醒后面等待的节点
*/
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
我们先看tryRelease方法:
protected final boolean tryRelease(int releases) {
// 释放后c的状态值
int c = getState() - releases;
// 如果持有锁的线程不是当前线程,直接抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 如果c==0,说明所有持有锁都释放完了,其他线程可以请求获取锁
free = true;
setExclusiveOwnerThread(null);
}
// 这里只会有一个线程执行到这,不存在竞争,因此不需要CAS
setState(c);
return free;
}
再看看unparkSuccessor方法:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
/*
如果状态小于0,把状态改成0,0是空的状态,因为node这个节点的线程释放了锁后续不需要做任何
操作,不需要这个标志位,即便CAS修改失败了也没关系,其实这里如果只是对于锁来说根本不需要CAS,因
为这个方法只会被释放锁的线程访问,只不过unparkSuccessor这个方法是AQS里的方法就必须考虑到多个
线程同时访问的情况(可能共享锁或者信号量这种)
*/
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
// 这段代码的作用是如果下一个节点为空或者下一个节点的状态>0(目前大于0就是取消状态)
// 则从tail节点开始遍历找到离当前节点最近的且waitStatus<=0(即非取消状态)的节点并唤醒
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
3. 结合lock方法,被唤醒的节点会自动替换当前节点成为head
(1)lock是一个接口,而synchronized是java的关键字,synchronized是内置的语言实现;
(4)通过lock可以知道有没有成功获取锁,而synchronized却无法办到
今晚会有我们在职一线大咖免费做Java进阶技术分享
大家有时间的可以来听听哦!
主题:【微服务架构设计之Dubbo内核剖析】
讲师:Roc / 资深架构师
课程收获:
1、掌握RPC核心原理
2、理解Socket通信机制
3、Dubbo底层架构剖析
4、基于Zookeeper实现服务上下线感知

END

扫码关注

