RWMutex的实现原理

March 3, 2018

 

使用场景

读写锁适用于读多写少的场景。基础特性包括:

  • RLock被reader占用时,Lock(/WLock)会被block。反之依然
  • RLock被占用时,其他读线程也可以获取到RLock。即允许多个reader同时访问共享资源
  • WLock同一时间只能被一个routine/thread获取

根据不同的优先级策略,可以有不同的实现方案:

  • 读优先(Read-preferring RW locks)

    读优先允许最大量的并发,但是竞争激烈时可能导致write-starvation。因为读多写少的场景下,如果读请求的优先级更高,写锁获取成功的概率就更低了。

  • 写优先(Write-preferring RW locks)

    为了避免write-starvation的情况出现,给WLock的优先级更高,如果RLock已经被占用,那么WLock也将会在block中, 此时后续的新到的RLock调用也将被block,直到RLock被释放,WLock被writer获取成功后释放。

    对比读优先,所允许的并发量相对要小一些。且实现更为复杂,性能也相对也要差一些。

  • 总结:A good way to think about it is RWMutex is a Mutex with a reader counter. RLock increments the counter while RUnlock decrements it. A call to Lock will block as long as that counter is > 0.

实现原理

一些大致的实现思路

思路一

一个写锁(mutex)保证writer互斥

一个读锁和计数器实现读锁。读锁用户维护计数器的准确性。计数器用于多个reader计数

读优先的实现:

[go]
//读锁
RLock()
    r.lock()
    r_cnt++
    if r_cnt==1: // 读写互斥
        w.lock()
    r.unlock()

RUnlock()
    r.lock()
    r_cnt--
    if r_cnt==0:
        w.unlock()
    r.unlock()

//写锁
Lock()
    w.lock()

Unlock()
    w.unlock()

思路二

使用锁和条件变量

[go]
RLock()
    m.lock()
    while w: // wLock使用中,block到被唤醒
        c.wait(m)
    r++
    m.unlock()

RUnlock()
    m.lock()
    r--
    if r==0:
        c.signal_all() // 唤醒writer
    m.unlock()

Lock()
    m.lock()
    while w || r > 0 :  // 有reader,block到被唤醒
        c.wait(m)
    w = true
    m.unlock()

Unlock()
    m.lock()
    w = false
    c.signal_all()  // 唤醒reader或writer
    m.unlock()

golang的实现

为避免writer被饿死,RWMutex中至少需要记录三个信息:

  • 使用中的reader计数器:a
  • writer请求中/使用中的标志:b
  • 挂起在writer之后的reader:c

golang用两个变量包括了以上三部分信息,具体见golang的实现(忽略race部分)

[go]
type RWMutex struct {
	w           Mutex  // held if there are pending writers
	writerSem   uint32 // semaphore for writers to wait for completing readers. writer等待reader完成的信号量
	readerSem   uint32 // semaphore for readers to wait for completing writers. reader等待writer完成的信号量

    // 计数器承担两个作用,一个是用来记录reader的个数,另一个作用用作判断是否有writer挂起或者在占有锁
	readerCount int32  // number of pending readers. 挂起的reader和使用中的reader,如果有writer操作(挂起/占有),计数器会变为负数以示区分
	readerWait  int32  // number of departing readers. 使用中的reader
}

func (rw *RWMutex) RLock() {
	if race.Enabled {
		_ = rw.w.state
		race.Disable()
	}

    // 没有writer时只需执行+1操作.
    // 如果有writer调用,计数器会被置为负数。此时reader被block,等待writer unlock后唤醒
	if atomic.AddInt32(&rw.readerCount, 1) < 0 {
		// A writer is pending, wait for it.
		runtime_Semacquire(&rw.readerSem)
	}

	if race.Enabled {
		race.Enable()
		race.Acquire(unsafe.Pointer(&rw.readerSem))
	}
}

// RUnlock undoes a single RLock call;
// it does not affect other simultaneous readers.
// It is a run-time error if rw is not locked for reading
// on entry to RUnlock.
func (rw *RWMutex) RUnlock() {
	if race.Enabled {
		_ = rw.w.state
		race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
		race.Disable()
	}

    // 计数器小于0,表示有writer在等待
	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
		if r+1 == 0 || r+1 == -rwmutexMaxReaders {
			race.Enable()
			throw("sync: RUnlock of unlocked RWMutex")
		}
		// A writer is pending.
		// 使用中的reader已全部释放,唤醒等待中的writer
		if atomic.AddInt32(&rw.readerWait, -1) == 0 {
			// The last reader unblocks the writer.
			runtime_Semrelease(&rw.writerSem, false)
		}
	}
	if race.Enabled {
		race.Enable()
	}
}

// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
func (rw *RWMutex) Lock() {
	if race.Enabled {
		_ = rw.w.state
		race.Disable()
	}

	// First, resolve competition with other writers.
	rw.w.Lock()

	// Announce to readers there is a pending writer.
	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
	// Wait for active readers.
	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
		runtime_Semacquire(&rw.writerSem)
	}
	if race.Enabled {
		race.Enable()
		race.Acquire(unsafe.Pointer(&rw.readerSem))
		race.Acquire(unsafe.Pointer(&rw.writerSem))
	}
}

// Unlock unlocks rw for writing. It is a run-time error if rw is
// not locked for writing on entry to Unlock.
//
// As with Mutexes, a locked RWMutex is not associated with a particular
// goroutine. One goroutine may RLock (Lock) an RWMutex and then
// arrange for another goroutine to RUnlock (Unlock) it.
func (rw *RWMutex) Unlock() {
	if race.Enabled {
		_ = rw.w.state
		race.Release(unsafe.Pointer(&rw.readerSem))
		race.Release(unsafe.Pointer(&rw.writerSem))
		race.Disable()
	}

	// Announce to readers there is no active writer.
	r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
	if r >= rwmutexMaxReaders {
		race.Enable()
		throw("sync: Unlock of unlocked RWMutex")
	}
	// Unblock blocked readers, if any.
	for i := 0; i < int(r); i++ {
		runtime_Semrelease(&rw.readerSem, false)
	}
	// Allow other writers to proceed.
	rw.w.Unlock()
	if race.Enabled {
		race.Enable()
	}
}

参考

See all postsSee all posts