从xv6 到 java AQS的lost-wakeup思考

JAVA AQS解决线程的同步中的lost-wakeup问题

在学习xv-6中的同步。

下面代码一般业务情况是,uartWrite方法从buff里读取字符,通过isdone判断是否有字符串,如果没有字符则调用sleep阻塞等待。uartintr往buffer写字符,设置isdone=1,再调用wakeup来唤醒sleep,读取buffer字符,进行处理。

注:while(isdone==0)检查状态一般都被包在acquire(conditionLock)和 release(conditionLock);中间,因为多线程环境中,需要会不断检查状态,因为可能刚被唤醒条件满足了,却又被另外一个线程把条件消耗了,导致需要重新检查进入睡眠。

这是一个简单的生产者-消费者模型,使用wakeup和sleep来实现同步

int isdone=0;

//消费者
void uartWrite(char c,int n)
{
  acquire(&uart_tx_lock);
  while(isdone==0){
      release(&uart_tx_lock)
       //INTERRUPT(WAKEUP)
      broken_sleep(&tx_chan); //放弃 cpu ,会调用sched()切换线程
      acquirre(&uart_tx_lock)
  }
   //read buffer characters.
   //doSomething.....
   isdone=0;//isdone设置为0 表示消费完成
   release(&uart_tx_lock);
   return;
}

//生产者
void uartintr(void)
{
  // send buffered characters.
  acquire(&uart_tx_lock);
  // doSometing
  isdone=1; //isdone设置为1 表示可以消费
  wakeup(&tx_chan);//设置状态,表示唤醒线程
  release(&uart_tx_lock);
}

void
wakeup(void *chan)
{
 
  for(p = proc; p < &proc[NPROC]; p++) {
    if(p->state == SLEEPING && p->chan == chan) {
      p->state = RUNNABLE;
    }
  }
}

对于共享变量isdone在uartintr 和uartWrite 都要用 acquire(&uart_tx_lock);和 release(&uart_tx_lock);来保护isdone的原子操作(uart_tx_lock称之为condition lock)

broken_sleep(&tx_chan);前要释放锁,再获取锁是为了防止睡眠的时候 uartintr 也获取锁,导致死锁。

broken_sleep(&tx_chan);后再加锁,uartWrite被唤醒了,需要做uartWrite的业务,需阻塞下一个 uartintr

lost-wakeup问题

首先uartWrite的buff并没有任何字符,会进入判断while(isdone==0),再release();

此时当uartintr传入字符后修改isdone=1,并且调用wakeup(),但是uartWrite还并没有进入sleep(),所以uartWrite是先被唤醒,再调用sched放弃cpu一直睡眠,如果没有下一次wakeup就永远不可能继续执行了。

意思是: uartWrite的release(&uart_tx_lock) -> sched() 不是原子的,或者导致了说先被唤醒,再线程睡眠,线程不可能醒来了

xv6如何解决:

修改上面的wakeup和sleep函数,用一把锁(p->lock)来保护release->sched为原子的操作

int isdone=0;
void uartWrite(char c,int n)
{
  acquire(&uart_tx_lock);
  while(isdone==0){
    
       // RIGHT-HERE -- INTERRUPT(WAKEUP)
      sleep(&tx_chan,&uart_tx_lock); //release和sech,放弃cpu和切换线程为原子操作
    
  }
   //read buffer characters.
   //doSomething.....
   isdone=0;
   release(&uart_tx_lock);
   return;
}

void
uartintr(void)
{
  // send buffered characters.
  acquire(&uart_tx_lock);
  // doSometing
  isdone=0;
  wakeup(&tx_chan);
  release(&uart_tx_lock);
}

void
sleep(void *chan, struct spinlock *lk)
{
  struct proc *p = myproc();
  if(lk != &p->lock){  //DOC: sleeplock0
    acquire(&p->lock);  //DOC: sleeplock1
      
    //上面的release(&uart_tx_lock),释放condition锁
    release(lk);
  }

  // Go to sleep.
  p->chan = chan;
  p->state = SLEEPING;

  sched();

  // Tidy up.
  p->chan = 0;

  // Reacquire original lock.
  if(lk != &p->lock)
    release(&p->lock);
    //重新获取condition锁
    acquire(lk);
  }
}

void
wakeup(void *chan)
{
  struct proc *p;

  for(p = proc; p < &proc[NPROC]; p++) {
    acquire(&p->lock);
    if(p->state == SLEEPING && p->chan == chan) {
      p->state = RUNNABLE;
    }
    release(&p->lock);
  }
}

AQS

java可以使用AQS的Condition对象来实现类似于上面的同步操作

public static void main(String[] args) throws InterruptedException {
        Lock lock=new ReentrantLock();
        Condition condition = lock.newCondition();
        AtomicInteger flag= new AtomicInteger(0);

        Thread t1 = new Thread(() -> {
            lock.lock();
            try {
                while (flag.get() == 0) {
                    System.out.println("condition.await()");
                    condition.await(); //await -> lock.unlock()-> flag==0(又被别的线程设置为0) ->await
                }
                flag.set(0);
                System.out.println("condition.wakeup");
            }catch (Exception ex){

            }finally {
                lock.unlock();
            }
        });

        Thread t2 = new Thread(() -> {
            lock.lock();
            try {
                Thread.sleep(2000);
                flag.set(1);
                System.out.println("condition.signal()");
                condition.signal();
            }catch (Exception ex){

            }finally {
                lock.unlock();
            }

        });

        t1.start();
        t2.start();

        t1.join();
        t2.join();
    }

AQS的await()

t1,t2都调用lock.lock()为了解决死锁,和xv6的sleep一样在进入await里面首先会释放锁

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //加入同步队列
    Node node = addConditionWaiter();
    //释放condition锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //1判断是否已经被唤醒了
    while (!isOnSyncQueue(node)) {
        //2park会判断是否唤醒,如果唤醒立即返回
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    
    //获取condition锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

AQS有两个队列,同步队列(SyncQueue)和条件队列(ConditionQueue),

调用await,线程状态设置为SLEEP,放弃cpu会进入ConditionQueue放弃cpu;

调用signal,线程状态设置为RUNNABLE会进入SyncQueue等待分配cpu;

await()

  1. 构造线程节点加入ConditionQueue
  2. 释放condition锁
  3. 检查是否在同步队列上

AQS解决 lost-wakeup的两个检查

  1. isOnSyncQueue(node)判断:会一直检查是否在SyncQueue,如果线程1的await调用fullyRelease之后,切换线程2调用signal()把线程移到SyncQueue,再切换await,此时不需要放弃cpu。直接可以结束等待

  2. LockSupport.park() 的安全性:如果线程1的await调用fullyRelease后,再isOnSyncQueue后才切换线程2调用signal(),因为signal会调用unpark,线程1调用park发现已经掉过unpark,会立即返回不会睡眠。

park/unpark 原理

每个线程独立维护自己的 _counter(在 Parker 中)

_counter 含义
0 当前线程没有许可证,需要阻塞等待
1 当前线程已有许可证,可以直接返回

如果没有 _counter

  • unpark(t)t 进入 park() 之前就执行了
  • t 还没开始阻塞,就错过了唤醒信号
  • 于是 t 永远阻塞,死锁发生 ⚠️(lost wake-up

有了 _counter

  • unpark(t) 会设置 t._counter = 1
  • t 调用 park() 时,先检查 _counter == 1
  • 发现已有“许可证”,直接返回,不阻塞
void Parker::park(bool isAbsolute, jlong time) {
    //Atomic::xchg:  原子性的把两个值交换,返回旧值;用来防止lost-wakeup
    if (Atomic::xchg(&_counter, 0) > 0) {
        // 已被 unpark 过,不需阻塞
        return;
    }

    pthread_mutex_lock(&_mutex);
    while (_counter == 0) {
        if (isAbsolute || time > 0) {
            pthread_cond_timedwait(&_cond, &_mutex, &abstime);
        } else {
            pthread_cond_wait(&_cond, &_mutex);
        }
    }
    _counter = 0;
    pthread_mutex_unlock(&_mutex);
}

void Parker::unpark() {
    int s = Atomic::xchg(&_counter, 1);
    if (s < 1) {
        // 如果线程已阻塞,唤醒它
        pthread_mutex_lock(&_mutex);
        pthread_cond_signal(&_cond);  // 唤醒正在等待的线程
        pthread_mutex_unlock(&_mutex);
    }
}