我是靠谱客的博主 结实胡萝卜,这篇文章主要介绍ReentranLock源码分析一、简介二、源码分析三、使用实例,现在分享给大家,希望可以做个参考。

一、简介

1.1 数据结构

ReetrantLock的底层是借助AbstractQueuedSynchronized实现,所以其数据结构依附于AbstractQueuedSynchronized的数据结构。

1.2 继承关系

复制代码
1
2
public class ReentrantLock implements Lock, java.io.Serializable

ReetranLock实现了Lock接口,Lock接口中定义了lock与unlock相关操作,并且还存在newCondition方法,表示生成一个条件。

1.3 原理

ReentrantLock主要利用CAS+AQS队列来实现,它支持公平所和非公平锁,两者的实现类似:

先通过CAS尝试获取锁,如果此时已经有线程占据了锁,那就加入AQS队列并且被挂起。当锁释放之后,排在CLH队列队首的线程会被唤醒,然后CAS再次尝试获取锁。在这个时候:

  • 非公平锁:如果同时还有另一个线程来尝试获取,那么有可能会让这个线程抢先获取
  • 公平锁:如果同时还有另一个线程来尝试获取,当它发现自己不是在队首的话,就会排到队尾,由队首的线程获取到锁。

1.4 AQS

AQS是一个用于构建锁和同步容器的框架。concurrent包内许多类都是基于AQS构建,例如ReentrantLock,Semphore,CountDownLatch等,AQS解决了在实现同步容器时设计的大量细节问题。
在这里插入图片描述
AQS使用一个FIFO的队列表示排队等待锁的线程,队列头节点称为”哨兵节点“或”哑节点“,它不与任何线程关联。其他的节点与等待线程关联,每个节点维护一个等待状态waitStatus。

1.5 使用示例

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
private Lock lock = new ReentrantLock(); public void test(){ lock.lock(); try{ doSomeThing(); }catch (Exception e){ // ignored }finally { lock.unlock(); } }

二、源码分析

2.1 内部类

ReetrantLock总共有三个内部类,并且三个内部类是紧密相关的,三个类的关系如下:
在这里插入图片描述
说明:ReentrantLock类内部总共存在Sync、NonfairSync、FairSync三个类,NonfairSync与FairSync类继承自Sync类,Sync类继承自AbstractQueuedSynchronizer抽象类。下面逐个进行分析。

2.2 Sync类

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
//使用ASQ state来指示这个锁被持有了多少次 abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; //获取锁 abstract void lock(); //非公平方式获取 final boolean nonfairTryAcquire(int acquires) { //当前线程 final Thread current = Thread.currentThread(); //获取状态 int c = getState(); if (c == 0) {//表示没有线程正在竞争该锁 //CAS设置成功,状态0表示锁没有被占用 if (compareAndSetState(0, acquires)) { //设置当前线程独占 setExclusiveOwnerThread(current); return true; } } //当前线程拥有该锁 else if (current == getExclusiveOwnerThread()) { //增加重入次数 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); //设置状态 setState(nextc); return true; } return false; } //试图在共享模式下获取对象状态,此方法应该查询是否允许它在共享模式下获取对象状态,如果允许,则获取它 protected final boolean tryRelease(int releases) { int c = getState() - releases; //当前线程不为独占线程 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } //判断资源是否被当前线程占有 protected final boolean isHeldExclusively() { // While we must in general read state before owner, // we don't need to do so to check if current thread is owner return getExclusiveOwnerThread() == Thread.currentThread(); } final ConditionObject newCondition() { return new ConditionObject(); } // Methods relayed from outer class //返回资源的占用线程 final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } final boolean isLocked() { return getState() != 0; } /** * Reconstitutes the instance from a stream (that is, deserializes it). */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } }

Sync类方法和作用如下:

方法作用
lock锁定,并未实现,留给具体子类实现
nonfairTryAcquire非公平方式获取
tryRelease试图在共享模式下获取对象状态,此方法应该查询是否允许它在共享模式下获取对象状态,如果允许,则获取它
isHeldExclusively判断资源是否被当前线程占有
newCondition新生一个条件
getOwner返回占有资源的线程
getHoldCount返回状态
isLocked资源是否被占用
readObject自定义反序列化逻辑

2.3 NonfairSync类

NonfairSync类继承了Sync类,采用非公平策略获取锁,其实现了Sync类中抽象的lock方法,具体实现如下:

首先用一个CAS操作,判断state是否为0(表示当前锁未被占用),如果是0则把它置为1,并且设置当前线程为该锁的独占线程,表示获取锁成功。当多个线程同时尝试占用同一个锁时,CAS操作只能保证一个线程操作成功,剩下的线程就进入等待队列。

“非公平性”也就体现在这里,如果占用锁的线程刚释放锁,state置为0,而排队等待锁的线程还未唤醒时,新来的线程就直接抢占了该锁,那么就“插队“了

复制代码
1
2
3
4
5
6
7
8
9
10
// 获得锁 final void lock() { if (compareAndSetState(0, 1)) // 把当前线程设置独占锁 setExclusiveOwnerThread(Thread.currentThread()); else // 锁已经被占用,或者set失败 // 以独占模式获取对象,忽略中断 acquire(1); }

若当前有三个线程去竞争锁,假设线程A的CAS操作成功了,拿到了锁返回,那么线程B和C则设置state失败,进入else,执行acquire方法

2.3.1 acquire在AQS中实现

复制代码
1
2
3
4
5
6
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

2.3.1.1 acquire第一步:尝试获取锁,如果获取锁成功,方法直接返回

tryAcquire的实现

复制代码
1
2
3
4
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }

这里调用了nonfairTryAcquire

nonfairTryAcquire的实现

整个流程是:
检查state字段,若为0,表示锁未被占用,那么尝试占用,若不为0,检查当前锁是否被自己占用,若被自己占用,则更新state字段,表示重入锁的次数,如果以上两点都没有成功,则获取锁失败,返回false

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { //获取当前线程 final Thread current = Thread.currentThread(); //获取state变量 int c = getState(); //没有线程占用锁的情况 if (c == 0) { //占用锁成功,设置独占线程为当前线程 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //当前线程已经占用该锁 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); //更新state值为新的重入次数 setState(nextc); return true; } //获取锁失败 return false; }

2.3.1.2 acquire第二步:入队

由上面可知,线程A已经占用了锁,所以B和C执行tryAcquire失败,并且进入等待队列。

addWaiter实现:

addWaiter函数是在AQS中实现的

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//将新节点和当前线程关联并且入队列 private Node addWaiter(Node mode) { //初始化节点,设置关联线程和模式(独占or共享) Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; //尾节点不为空,说明队列已经初始化过 if (pred != null) { node.prev = pred; //设置新节点为尾节点 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //尾节点为空,说明队列还未初始化,需要初始化head节点并入对新节点 enq(node); return node; }

此时B、C线程同时尝试入队列,由于队列尚未初始化,tail==null,故至少会有一个线程走到enq(node)。我们假设这两个线程同时走到了enq(node)里。

2.1.2.5 enq

这里体现了经典的自旋+CAS操作来实现非阻塞的原子操作。由于compareAndSetHead的实现使用了unsafe类提供的CAS操作,所以只有一个线程会创建head节点成功。假设线程B成功,之后B、C开始第二轮循环,此时tail不为空,两个线程都走到else里面。假设B线程compareAndSetTail成功,那么B就可以返回了,C由于入队失败还需要第三轮循环。最终所有线程都可以成功入队

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private Node enq(final Node node) { //开始自旋 for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }

当B、C入队后,此时AQS队列如下:
在这里插入图片描述

2.3.1.2 acquire第二步:挂起

此时B和C相继执行acquireQueued(final Node node, int arg)。这个方法让已经入队的线程尝试获取锁,若失败则会被挂起

acquireQueued实现:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//已经入队的线程尝试获取锁 final boolean acquireQueued(final Node node, int arg) { //表示是否成功获取锁 boolean failed = true; try { //标记线程是否被中断过 boolean interrupted = false; for (;;) { //获取前驱节点 final Node p = node.predecessor(); //如果前驱结点是head,即该结点已经成为老二,那么便有资格去尝试获取锁 if (p == head && tryAcquire(arg)) { //获取成功,将当前结点设置为head结点 setHead(node); //原head结点出队,在某个时间点被GC回收 p.next = null; // help GC //获取成功 failed = false; //返回是否被中断过 return interrupted; } //判断获取失败后是否可以挂起,若可以挂起 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //线程若被中断,设置interrupted为true interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

假设B和C在竞争锁的过程中A一直持有锁,那么它们的tryAcquire操作都会失败,因此会走到第二个if语句中。接下来看下shouldParkAfterFailedAcquire和parkAndCheckInterrupt都做了哪些事

shouldParkAfterFailedAcquire实现:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//判断当前线程获取锁失败之后是否需要挂起 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //前驱结点的状态 int ws = pred.waitStatus; //前驱结点状态为signal,返回true if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; //前驱结点状态为CANCELLED if (ws > 0) { //从队尾向前寻找第一个状态不为CANCELLED的节点 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ //将前驱结点的状态设置为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }

parkAndCheckInterrupt实现:

复制代码
1
2
3
4
5
6
//挂起当前线程,返回线程中断状态并重置 private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }

线程入队后能够挂起的前提是,它的前驱结点的状态为SIGNAL,它的含义是”Hi,前面的兄弟,如果你获取锁并且出队后,记得把我唤醒!“。所以shouldParkAfterFailedAcquired会先判断当前结点的前驱结点是否状态符合要求,若符合则返回true,然后调用parkAndCheckInterrupt,将自己挂起。如果不符合,再看前驱结点是否>0(CANCELLED),若是那么向前遍历直到找到第一个符合要求的前驱,若不是则将前驱结点的状态设置为SIGNAL。

整个流程中,如果前驱结点的状态不是SIGNAL,那么自己就不能安心挂起,需要去找个安心的挂起点,同时可以再尝试下看有没有机会去竞争锁。

最终队列可能如下图所示:

在这里插入图片描述

2.3.2 unlock()

复制代码
1
2
3
4
public void unlock() { sync.release(1); }
复制代码
1
2
3
4
5
6
7
8
9
10
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }

解锁的过程相比加锁容易很多。流程大致为先尝试释放锁,若释放成功,那么查看头节点的状态是否为SIGNAL,如果是则唤醒头结点的下个结点关联的线程,如果释放失败那么返回false表示解锁失败。这里我们可以发现,每次都只唤起头节点的下一个结点关联的线程。

最后看下tryRelease的执行过程:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }

这里入参为1,tryRelease的过程为:当前释放锁的线程若不持有锁,则抛出异常,若持有锁,计算释放后的state值是否为0,若为0表示锁已经被成功释放,并且清空独占线程,最后更新state值,返回free

以下用一张流程图总结了非公平锁的获取锁的过程
在这里插入图片描述

2.4 FairSyn类

公平锁和非公平锁的不同之处在于,公平锁在获取锁的时候,不会先去检查state状态,而是直接执行acquire(1)

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 公平锁 static final class FairSync extends Sync { // 版本序列化 private static final long serialVersionUID = -3000897897090466540L; final void lock() { // 以独占模式获取对象,忽略中断 acquire(1); } // 尝试公平获取锁 protected final boolean tryAcquire(int acquires) { // 获取当前线程 final Thread current = Thread.currentThread(); // 获取状态 int c = getState(); if (c == 0) { // 状态为0 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { // 不存在已经等待更久的线程并且比较并且设置状态成功 // 设置当前线程独占 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 状态不为0,即资源已经被线程占据 // 下一个状态 int nextc = c + acquires; if (nextc < 0) // 超过了int的表示范围 throw new Error("Maximum lock count exceeded"); // 设置状态 setState(nextc); return true; } return false; } }

跟踪lock方法的源码可知,当资源空闲时,它总是会先判断sync队列(AbstractQueuedSynchronizer中的数据结构)是否有等待时间更长的线程,如果存在,则将该线程加入到等待队列的尾部,实现了公平获取原则。其中,FairSync类的lock的方法调用如下,只给出了主要的方法。
在这里插入图片描述
可以看出只要资源被其他线程占用,该线程就会添加到sync queue中的尾部,而不会先尝试获取资源。这也是和Nonfair最大的区别,Nonfair每一次都会尝试去获取资源,如果此时该资源恰好被释放,则会被当前线程获取,这就造成了不公平的现象,当获取不成功,再加入队列尾部。

三、使用实例

3.1 公平锁

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class test { public static void main(String[] args) throws Exception { Lock lock=new ReentrantLock(true); MyThread t1=new MyThread("t1",lock); MyThread t2=new MyThread("t2",lock); MyThread t3=new MyThread("t3",lock); t1.start(); t2.start(); t3.start(); } } class MyThread extends Thread{ private Lock lock; public MyThread(String name,Lock lock){ super(name); this.lock=lock; } public void run(){ lock.lock(); try{ System.out.println(Thread.currentThread()+" running"); try{ Thread.sleep(500); }catch (InterruptedException e) { e.printStackTrace(); } }finally { lock.unlock(); } } }

运行结果(某一次):

复制代码
1
2
3
4
Thread[t1,5,main] running Thread[t2,5,main] running Thread[t3,5,main] running

该示例使用的是公平策略,由结果可知,可能回存在如下一种时序:
在这里插入图片描述
说明:首先,t1线程的lock操作 -> t2线程的lock操作 -> t3线程的lock操作 -> t1线程的unlock操作 -> t2线程的unlock操作 -> t3线程的unlock操作。根据这个时序图来进一步分析源码的工作流程。

  1. t1线程执行lock.lock,下图给出了方法调用中的主要方法
    在这里插入图片描述
    说明:由调用流程可知,t1线程成功获取了资源,可以继续执行
  2. t2线程执行lock.lock,下图给出了方法调用中的主要方法
    在这里插入图片描述
    说明:由上图可知,最后的结果是t2线程会被禁止,因为调用了LockSupport.park
  3. t3线程执行lock.lock,下图给出了方法调用中的主要方法
    在这里插入图片描述
  4. t1线程调用了lock.unlock,下图给出了方法调用中的主要方法
    在这里插入图片描述
    说明:如上图所示,最后,head的状态会变为0,t2线程会被unpark,即t2线程可以继续运行。此时t3线程还是被禁止。
  5. t2获得CPU资源,继续运行,由于t2之前被park了,现在需要恢复之前的状态,下图给出了方法调用中的主要方法
    在这里插入图片描述
    说明:在setHead函数中会将head设置为之前head的下一个结点,并且将pre域与thread域都设置为null,在acquireQueued返回之前,sync queue就只有两个结点了。
  6. t2执行lock.unlock,下图给出了方法调用中的主要方法。

在这里插入图片描述
说明:由上图可知,最终unpark t3线程,让t3线程可以继续运行。
7. t3线程获取cpu资源,恢复之前的状态,继续运行。
在这里插入图片描述
8. t3执行lock.unlock,下图给出了方法调用中的主要方法
在这里插入图片描述
说明:最后的状态和之前的状态是一样的,队列中有一个空节点,头结点为尾节点均指向它。

参考

最后

以上就是结实胡萝卜最近收集整理的关于ReentranLock源码分析一、简介二、源码分析三、使用实例的全部内容,更多相关ReentranLock源码分析一、简介二、源码分析三、使用实例内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(80)

评论列表共有 0 条评论

立即
投稿
返回
顶部