Java之ReentrantLock

非常好的一篇文章:(并发编程网)

http://ifeve.com/%E9%80%8F%E8%BF%87reentrantlock%E7%AA%A5%E6%8E%A2aqs/

首先理解一下wait和notify

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class WaitThread extends Thread {
private volatile boolean fire = false;

@Override
public void run() {
try {
synchronized (this) {
while (!fire) {
wait();
}
}
System.out.println("fired");
} catch (InterruptedException e) {
}
}

public synchronized void fire() {
this.fire = true;
notify();
}

public static void main(String[] args) throws InterruptedException {
WaitThread waitThread = new WaitThread();
waitThread.start();
Thread.sleep(1000);
System.out.println("fire");
waitThread.fire();
}
}

示例代码中有两个线程,一个是主线程,一个是WaitThread,协作的条件变量是fire,WaitThread等待该变量变为true,在不为true的时候调用wait,主线程设置该变量并调用notify。

两个线程都要访问协作的变量fire,容易出现竞态条件,所以相关代码都需要被synchronized保护。实际上,wait/notify方法只能在synchronized代码块内被调用,如果调用wait/notify方法时,当前线程没有持有对象锁,会抛出异常java.lang.IllegalMonitorStateException。

你可能会有疑问,如果wait必须被synchronzied保护,那一个线程在wait时,另一个线程怎么可能调用同样被synchronzied保护的notify方法呢?它不需要等待锁吗?我们需要进一步理解wait的内部过程,虽然是在synchronzied方法内,但调用wait时,线程会释放对象锁,wait的具体过程是:

  • 把当前线程放入条件等待队列,释放对象锁,阻塞等待,线程状态变为WAITING或TIMED_WAITING
  • 等待时间到或被其他线程调用notify/notifyAll从条件队列中移除,这时,要重新竞争对象锁
    (1) 如果能够获得锁,线程状态变为RUNNABLE,并从wait调用中返回
    (2) 否则,该线程加入对象锁等待队列,线程状态变为BLOCKED,只有在获得锁后才会从wait调用中返回
    线程从wait调用中返回后,不代表其等待的条件就一定成立了,它需要重新检查其等待的条件,一般的调用模式是:
    1
    2
    3
    4
    5
    synchronized (obj) {
    while (条件不成立)
    obj.wait();
    ... // 执行条件满足后的操作
    }

基于wait和notify实现的生产者和消费者(局限:只能有一个条件等待队列,分析等待条件也很复杂。在生产者/消费者模式中,其实有两个条件,一个与队列满有关,一个与队列空有关。)

https://www.cnblogs.com/swiftma/p/6421803.html

基于ReentrantLock实现(使用显式锁,可以创建多个条件等待队列)

https://www.cnblogs.com/swiftma/p/6528219.html

一篇很好的文章,介绍AQS的

https://juejin.im/post/5c11d6376fb9a049e82b6253

基于AQS

AQS内部维护了一个sync queue来管理锁,线程会首先尝试获取锁,如果获取失败,就将线程封装成Node放入到这个队列中,当前节点head的后继节点会尝试不断获取锁,如果失败会阻塞自己,直到自己被唤醒。而当持有锁的线程释放锁时,会唤醒队列中的后继线程。

如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。事实上,节点的定义复用了同步器中节点的定义,也就是说,同步队列和等待队列中节点类型都是同步器的静态内部类AbstractQueuedSynchronizer.Node。

在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的
Lock(更确切地说是同步器)拥有一个同步队列和多个等待队列。

CountDownLatch、Semphore、CyclicBarrier、ReentrantLock都是基于AQS的。

AQS内部有个内部类ConditionObject,ConditionObject是同步器AbstractQueuedSynchronizer的内部类,因为Condition的操作需要获取相关联的锁,所以作为同步器的内部类也较为合理。每个Condition对象都包含着一个队列(以下称为等待队列),该队列是Condition对象实现等待/通知功能的关键。

公平锁实现原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
acquire(1);
}

/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取state
int c = getState();
// state=0表示当前队列中没有线程被加锁
if (c == 0) {
/*
* 首先判断是否有前继结点,如果没有则当前队列中还没有其他线程;
* 设置状态为acquires,即lock方法中写死的1(这里为什么不直接setState?因为可能同时有多个线程同时在执行到此处,所以用CAS来执行);
* 设置当前线程独占锁。
*/
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
/*
* 如果state不为0,表示已经有线程独占锁了,这时还需要判断独占锁的线程是否是当前的线程,原因是由于ReentrantLock为可重入锁;
* 如果独占锁的线程是当前线程,则将状态加1,并setState;
* 这里为什么不用compareAndSetState?因为独占锁的线程已经是当前线程,不需要通过CAS来设置。
*/
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

ReentrantLock非公平实现原理

AQS获取独占锁的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

该方法主要工作如下:

  1. 尝试获取独占锁;
  2. 获取成功则返回,否则执行步骤3;
  3. addWaiter方法将当前线程封装成Node对象,并添加到队列尾部;
  4. 自旋获取锁,并判断中断标志位。如果中断标志位为true,执行步骤5,否则返回;
  5. 设置线程中断。

AQS的addWaiter实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private Node addWaiter(Node mode) {
// 根据当前线程创建一个Node对象
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 判断tail是否为空,如果为空表示队列是空的,直接enq
if (pred != null) {
node.prev = pred;
// 这里尝试CAS来设置队尾,如果成功则将当前节点设置为tail,否则enq
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

该方法就是根据当前线程创建一个Node,然后添加到队列尾部。

AQS的enq方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Node enq(final Node node) {
// 重复直到成功
for (;;) {
Node t = tail;
// 如果tail为null,则必须创建一个Node节点并进行初始化
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// 尝试CAS来设置队尾
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

AQS的acquireQueued方法

该方法的功能是循环的尝试获取锁,直到成功为止,最后返回中断标志位。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 中断标志位
boolean interrupted = false;
for (;;) {
// 获取前继节点
final Node p = node.predecessor();
// 如果前继节点是head,则尝试获取
if (p == head && tryAcquire(arg)) {
// 设置head为当前节点(head中不包含thread)
setHead(node);
// 清除之前的head
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果p不是head或者获取锁失败,判断是否需要进行park
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

自定义一个同步组件TwinsLock

该工具在同一时刻,只允许至多两个线程同时访问,超过两个线程的访问将被阻塞,我们将这个同步工具命名为TwinsLock。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/**
*
*/
package chapter05;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
* 10-10
*/
public class TwinsLock implements Lock {
private final Sync sync = new Sync(2);

private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -7889272986162341211L;

Sync(int count) {
if (count <= 0) {
throw new IllegalArgumentException("count must large than zero.");
}
setState(count);
}
//共享模式下需要实现这个,独占模式需要实现tryAcquire()
public int tryAcquireShared(int reduceCount) {
for (;;) {
int current = getState();
int newCount = current - reduceCount;
if (newCount < 0 || compareAndSetState(current, newCount)) {
return newCount;
}
}
}

public boolean tryReleaseShared(int returnCount) {
for (;;) {
int current = getState();
int newCount = current + returnCount;
if (compareAndSetState(current, newCount)) {
return true;
}
}
}

final ConditionObject newCondition() {
return new ConditionObject();
}
}

public void lock() {
sync.acquireShared(1);
}

public void unlock() {
sync.releaseShared(1);
}

public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public boolean tryLock() {
return sync.tryAcquireShared(1) >= 0;
}

public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
}

@Override
public Condition newCondition() {
return sync.newCondition();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
*
*/
package chapter05;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import chapter04.SleepUtils;

/**
* 10-11
*/
public class TwinsLockTest {
public static void main(String[] args) {
test();
}


public static void test() {
final Lock lock = new TwinsLock();
class Worker extends Thread {
public void run() {
while (true) {
lock.lock();
try {
SleepUtils.second(1);
System.out.println(Thread.currentThread().getName());
SleepUtils.second(1);
} finally {
lock.unlock();
}
}
}
}
// 启动10个线程
for (int i = 0; i < 10; i++) {
Worker w = new Worker();
w.setDaemon(true);
w.start();
}
// 每隔1秒换行
for (int i = 0; i < 10; i++) {
SleepUtils.second(1);
System.out.println();
}
}
}

会两个两个的。。。Thread-4
Thread-7,可以看到线程名称成对输出,也就是在同一时刻只有两个线程能够获
取到锁,这表明TwinsLock可以按照预期正确工作。

image

参考

https://www.jianshu.com/p/fadac70b2b1c

-------------本文结束感谢您的阅读-------------