从xv6 到 java AQS的lost-wakeup思考
JAVA AQS解决线程的同步中的lost-wakeup问题
在学习xv-6中的同步。
下面代码一般业务情况是,uartWrite方法从buff里读取字符,通过isdone判断是否有字符串,如果没有字符则调用sleep阻塞等待。uartintr往buffer写字符,设置isdone=1,再调用wakeup来唤醒sleep,读取buffer字符,进行处理。
注:while(isdone==0)检查状态一般都被包在acquire(conditionLock)和 release(conditionLock);中间,因为多线程环境中,需要会不断检查状态,因为可能刚被唤醒条件满足了,却又被另外一个线程把条件消耗了,导致需要重新检查进入睡眠。
这是一个简单的生产者-消费者模型,使用wakeup和sleep来实现同步
int isdone=0;
//消费者
void uartWrite(char c,int n)
{
acquire(&uart_tx_lock);
while(isdone==0){
release(&uart_tx_lock)
//INTERRUPT(WAKEUP)
broken_sleep(&tx_chan); //放弃 cpu ,会调用sched()切换线程
acquirre(&uart_tx_lock)
}
//read buffer characters.
//doSomething.....
isdone=0;//isdone设置为0 表示消费完成
release(&uart_tx_lock);
return;
}
//生产者
void uartintr(void)
{
// send buffered characters.
acquire(&uart_tx_lock);
// doSometing
isdone=1; //isdone设置为1 表示可以消费
wakeup(&tx_chan);//设置状态,表示唤醒线程
release(&uart_tx_lock);
}
void
wakeup(void *chan)
{
for(p = proc; p < &proc[NPROC]; p++) {
if(p->state == SLEEPING && p->chan == chan) {
p->state = RUNNABLE;
}
}
}
对于共享变量isdone在uartintr 和uartWrite 都要用 acquire(&uart_tx_lock);和 release(&uart_tx_lock);来保护isdone的原子操作(uart_tx_lock称之为condition lock)
broken_sleep(&tx_chan);前要释放锁,再获取锁是为了防止睡眠的时候 uartintr 也获取锁,导致死锁。
broken_sleep(&tx_chan);后再加锁,uartWrite被唤醒了,需要做uartWrite的业务,需阻塞下一个 uartintr
lost-wakeup问题
首先uartWrite的buff并没有任何字符,会进入判断while(isdone==0),再release();
此时当uartintr传入字符后修改isdone=1,并且调用wakeup(),但是uartWrite还并没有进入sleep(),所以uartWrite是先被唤醒,再调用sched放弃cpu一直睡眠,如果没有下一次wakeup就永远不可能继续执行了。
意思是: uartWrite的release(&uart_tx_lock) -> sched() 不是原子的,或者导致了说先被唤醒,再线程睡眠,线程不可能醒来了
xv6如何解决:
修改上面的wakeup和sleep函数,用一把锁(p->lock)来保护release->sched为原子的操作
int isdone=0;
void uartWrite(char c,int n)
{
acquire(&uart_tx_lock);
while(isdone==0){
// RIGHT-HERE -- INTERRUPT(WAKEUP)
sleep(&tx_chan,&uart_tx_lock); //release和sech,放弃cpu和切换线程为原子操作
}
//read buffer characters.
//doSomething.....
isdone=0;
release(&uart_tx_lock);
return;
}
void
uartintr(void)
{
// send buffered characters.
acquire(&uart_tx_lock);
// doSometing
isdone=0;
wakeup(&tx_chan);
release(&uart_tx_lock);
}
void
sleep(void *chan, struct spinlock *lk)
{
struct proc *p = myproc();
if(lk != &p->lock){ //DOC: sleeplock0
acquire(&p->lock); //DOC: sleeplock1
//上面的release(&uart_tx_lock),释放condition锁
release(lk);
}
// Go to sleep.
p->chan = chan;
p->state = SLEEPING;
sched();
// Tidy up.
p->chan = 0;
// Reacquire original lock.
if(lk != &p->lock)
release(&p->lock);
//重新获取condition锁
acquire(lk);
}
}
void
wakeup(void *chan)
{
struct proc *p;
for(p = proc; p < &proc[NPROC]; p++) {
acquire(&p->lock);
if(p->state == SLEEPING && p->chan == chan) {
p->state = RUNNABLE;
}
release(&p->lock);
}
}
AQS
java可以使用AQS的Condition对象来实现类似于上面的同步操作
public static void main(String[] args) throws InterruptedException {
Lock lock=new ReentrantLock();
Condition condition = lock.newCondition();
AtomicInteger flag= new AtomicInteger(0);
Thread t1 = new Thread(() -> {
lock.lock();
try {
while (flag.get() == 0) {
System.out.println("condition.await()");
condition.await(); //await -> lock.unlock()-> flag==0(又被别的线程设置为0) ->await
}
flag.set(0);
System.out.println("condition.wakeup");
}catch (Exception ex){
}finally {
lock.unlock();
}
});
Thread t2 = new Thread(() -> {
lock.lock();
try {
Thread.sleep(2000);
flag.set(1);
System.out.println("condition.signal()");
condition.signal();
}catch (Exception ex){
}finally {
lock.unlock();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
}
AQS的await()
t1,t2都调用lock.lock()为了解决死锁,和xv6的sleep一样在进入await里面首先会释放锁
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//加入同步队列
Node node = addConditionWaiter();
//释放condition锁
int savedState = fullyRelease(node);
int interruptMode = 0;
//1判断是否已经被唤醒了
while (!isOnSyncQueue(node)) {
//2park会判断是否唤醒,如果唤醒立即返回
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//获取condition锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
AQS有两个队列,同步队列(SyncQueue)和条件队列(ConditionQueue),
调用await,线程状态设置为SLEEP,放弃cpu会进入ConditionQueue放弃cpu;
调用signal,线程状态设置为RUNNABLE会进入SyncQueue等待分配cpu;
await()
- 构造线程节点加入ConditionQueue
- 释放condition锁
- 检查是否在同步队列上
AQS解决 lost-wakeup的两个检查
-
isOnSyncQueue(node)判断:会一直检查是否在SyncQueue,如果线程1的await调用fullyRelease之后,切换线程2调用signal()把线程移到SyncQueue,再切换await,此时不需要放弃cpu。直接可以结束等待
-
LockSupport.park()的安全性:如果线程1的await调用fullyRelease后,再isOnSyncQueue后才切换线程2调用signal(),因为signal会调用unpark,线程1调用park发现已经掉过unpark,会立即返回不会睡眠。
park/unpark 原理
每个线程独立维护自己的 _counter(在 Parker 中)
_counter 值 |
含义 |
|---|---|
0 |
当前线程没有许可证,需要阻塞等待 |
1 |
当前线程已有许可证,可以直接返回 |
如果没有 _counter:
unpark(t)在t进入park()之前就执行了t还没开始阻塞,就错过了唤醒信号- 于是
t永远阻塞,死锁发生 ⚠️(lost wake-up)
有了 _counter:
unpark(t)会设置t._counter = 1- 当
t调用park()时,先检查_counter == 1 - 发现已有“许可证”,直接返回,不阻塞
void Parker::park(bool isAbsolute, jlong time) {
//Atomic::xchg: 原子性的把两个值交换,返回旧值;用来防止lost-wakeup
if (Atomic::xchg(&_counter, 0) > 0) {
// 已被 unpark 过,不需阻塞
return;
}
pthread_mutex_lock(&_mutex);
while (_counter == 0) {
if (isAbsolute || time > 0) {
pthread_cond_timedwait(&_cond, &_mutex, &abstime);
} else {
pthread_cond_wait(&_cond, &_mutex);
}
}
_counter = 0;
pthread_mutex_unlock(&_mutex);
}
void Parker::unpark() {
int s = Atomic::xchg(&_counter, 1);
if (s < 1) {
// 如果线程已阻塞,唤醒它
pthread_mutex_lock(&_mutex);
pthread_cond_signal(&_cond); // 唤醒正在等待的线程
pthread_mutex_unlock(&_mutex);
}
}