Scala并发编程实战 3:Condition 条件变量

Scala并发编程实战:Lock 锁中我们了解到如何通过Lock来实现互斥操作,但是获取锁之后,如果发现条件不满足(如消费一个队列中的数据时,发现队列为空),线程要如何等待条件满足(如队列不为空)并让出锁呢?这需要用到Condition条件变量,又称作条件队列。

Condition与JDK内置的管程的等待队列功能类似,主要功能都是让线程在队列上等待被唤醒,但是内置管程的锁只能有一个对应的队列,而一个Lock锁可以有多个Condition队列。

Condition接口的定义如下:

1
2
3
4
5
6
7
8
9
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}

Condition的await,signal,signalAll方法分别对应Object类的wait,notify,notifyAll方法。

下面这个栗子就是实现一个线程安全的有界队列,队列满的时候,让入队操作的线程wait,队列空的时候就让出队操作的线程wait,当队列情况变化的时候,又能及时唤醒wait状态的线程。
队列实现类中创建了两个Condition条件变量notFull和notEmpty。
当put元素进入队列时,如果队列已满,则需要等待notFull条件,即在notFull的等待队列等候;
当从队列take元素时,如果队列为空,则需要等待notEmpty条件,即在notEmpty的等待队列等候。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
 object ConditionDemo {
def main(args: Array[String]): Unit = {
val queue = new BoundedBuffer
for (i <- 1 to 10) {
new Thread {
override def run(): Unit = {
queue.put(s"Item$i")
}
}.start()
}
for (i <- 1 to 10) {
new Thread {
override def run(): Unit = {
queue.take
}
}.start()
}
}

class BoundedBuffer {
final val lock = new ReentrantLock
final val notFull = lock.newCondition
final val notEmpty = lock.newCondition
final val items = new Array[Any](3)
var putptr = 0
var takeptr = 0
var count = 0

def put(x: Any): Unit = {
lock.lock()
try {
while (count == items.length) {
notFull.await()
}
items(putptr) = x
putptr += 1
if (putptr == items.length) {
putptr = 0
}
count += 1
println(s"put $x")
notEmpty.signal()
} finally {
lock.unlock()
}
}

def take: Any = {
lock.lock()
try {
while (count == 0) {
notEmpty.await()
}
val x = items(takeptr)
takeptr += 1
if (takeptr == items.length) {
takeptr = 0
}
count -= 1
println(s"take $x")
notFull.signal()
x
} finally {
lock.unlock()
}
}
}

}

output:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
put Item8
put Item10
put Item5
take Item8
put Item7
take Item10
put Item9
take Item5
put Item1
take Item7
put Item4
take Item9
put Item2
take Item1
put Item6
take Item4
put Item3
take Item2
take Item6
take Item3

本文代码

Github仓库