在Linux系統(tǒng)中有大量的臨界資源需要保護(hù),如何讓各個(gè)任務(wù)有條不紊的訪問(wèn)這些資源,這涉及到Linux中并發(fā)訪問(wèn)的保護(hù)機(jī)制設(shè)計(jì)相關(guān)知識(shí)。后面會(huì)詳細(xì)介紹這幾個(gè)機(jī)制。
(據(jù)可靠消息,鎖的實(shí)現(xiàn)經(jīng)常出現(xiàn)在筆試環(huán)節(jié)。既可以考察面試者對(duì)鎖的原理的理解,又可以考察面試者編程技能)。
注:部分代碼都是根據(jù)ARM64架構(gòu)匯編代碼翻譯成C語(yǔ)言并經(jīng)過(guò)精簡(jiǎn)(例如:spin lock、read-write lock)。
也有部分代碼實(shí)現(xiàn)是為了呈現(xiàn)背后設(shè)計(jì)的原理自己編寫的,而不是精簡(jiǎn)Linux中實(shí)現(xiàn)的代碼(例如mutex)。
自旋鎖(spin lock)
自旋鎖是Linux中使用非常頻繁的鎖,原理簡(jiǎn)單。
內(nèi)核當(dāng)發(fā)生訪問(wèn)資源沖突的時(shí)候,可以有兩種鎖的解決方案選擇:
- 一個(gè)是原地等待一個(gè)是掛起當(dāng)前進(jìn)程,調(diào)度其他進(jìn)程執(zhí)行(睡眠)
Spinlock
是內(nèi)核中提供的一種比較常見(jiàn)的鎖機(jī)制,自旋鎖是“原地等待”的方式解決資源沖突的,即,一個(gè)線程獲取了一個(gè)自旋鎖后,另外一個(gè)線程期望獲取該自旋鎖,獲取不到,只能夠原地“打轉(zhuǎn)”(忙等待)。
由于自旋鎖的這個(gè)忙等待的特性,注定了它使用場(chǎng)景上的限制
—— 自旋鎖不應(yīng)該被長(zhǎng)時(shí)間的持有(消耗 CPU 資源),一般應(yīng)用在中斷上下文。
原理
下面以去銀行辦業(yè)務(wù)為例來(lái)講解。
- 銀行的辦事大廳一般會(huì)有幾個(gè)窗口同步進(jìn)行。今天很不巧,只有一個(gè)窗口提供服務(wù)?,F(xiàn)在的銀行服務(wù)都是采用取號(hào)排隊(duì),叫號(hào)服務(wù)的方式。當(dāng)你去銀行辦理業(yè)務(wù)的時(shí)候,首先會(huì)去取號(hào)機(jī)器領(lǐng)取小票,上面寫著你排多少號(hào)。然后你就可以排隊(duì)等待了。一般還會(huì)有個(gè)顯示屏,上面會(huì)顯示一個(gè)數(shù)字(例如:"請(qǐng)xxx號(hào)到1號(hào)窗口辦理"),代表當(dāng)前可以被服務(wù)顧客的排隊(duì)號(hào)碼。每辦理完一個(gè)顧客的業(yè)務(wù),顯示屏上面的數(shù)字都會(huì)增加1。等待的顧客都會(huì)對(duì)比自己手上寫的編號(hào)和顯示屏上面是否一致,如果一致的話,就可以去前臺(tái)辦理業(yè)務(wù)了?,F(xiàn)在早上剛開業(yè),顧客A是今天的第一個(gè)顧客,去取號(hào)機(jī)器領(lǐng)取0號(hào)(next計(jì)數(shù))小票,然后看到顯示屏上顯示0(owner計(jì)數(shù)),顧客A就知道現(xiàn)在輪到自己辦理業(yè)務(wù)了。顧客A到前臺(tái)辦理業(yè)務(wù)(持有鎖)中,顧客B來(lái)了。同樣,顧客B去取號(hào)機(jī)器拿到1號(hào)(next計(jì)數(shù))小票。然后乖乖的坐在旁邊等候。顧客A依然在辦理業(yè)務(wù)中,此時(shí)顧客C也來(lái)了。顧客C去取號(hào)機(jī)器拿到2號(hào)(next計(jì)數(shù))小票。顧客C也乖乖的找個(gè)座位繼續(xù)等待。終于,顧客A的業(yè)務(wù)辦完了(釋放鎖)。然后,顯示屏上面顯示1(owner計(jì)數(shù))。顧客B和C都對(duì)比顯示屏上面的數(shù)字和自己手中小票的數(shù)字是否相等。顧客B終于可以辦理業(yè)務(wù)了(持有鎖)。顧客C依然等待中。顧客B的業(yè)務(wù)辦完了(釋放鎖)。然后,顯示屏上面顯示2(owner計(jì)數(shù))。顧客C終于開始辦理業(yè)務(wù)(持有鎖)。顧客C的業(yè)務(wù)辦完了(釋放鎖)。3個(gè)顧客都辦完了業(yè)務(wù)離開了。只留下一個(gè)銀行柜臺(tái)服務(wù)員。最終,顯示屏上面顯示3(owner計(jì)數(shù))。取號(hào)機(jī)器的下一個(gè)排隊(duì)號(hào)也是3號(hào)(next計(jì)數(shù))。無(wú)人辦理業(yè)務(wù)(鎖是釋放狀態(tài))。
Linux中針對(duì)每一個(gè)spin
lock會(huì)有兩個(gè)計(jì)數(shù)。
分別是next和owner(初始值為0)。進(jìn)程A申請(qǐng)鎖時(shí),會(huì)判斷next和owner的值是否相等。
如果相等就代表鎖可以申請(qǐng)成功,否則原地自旋。直到owner和next的值相等才會(huì)退出自旋。
假設(shè)進(jìn)程A申請(qǐng)鎖成功,然后會(huì)next加1。
此時(shí)owner值為0,next值為1。
進(jìn)程B也申請(qǐng)鎖,保存next得值到局部變量temp(temp=1)中。
由于next和owner值不相等,因此原地自旋讀取owner的值,判斷owner和temp是否相等,直到相等退出自旋狀態(tài)。
當(dāng)然next的值還是加1,變成2。
進(jìn)程A釋放鎖,此時(shí)會(huì)將owner的值加1,那么此時(shí)B進(jìn)程的owner和temp的值都是1,因此B進(jìn)程獲得鎖。當(dāng)B進(jìn)程釋放鎖后,同樣會(huì)將owner的值加1。
最后owner和next都等于2,代表沒(méi)有進(jìn)程持有鎖。next就是一個(gè)記錄申請(qǐng)鎖的次數(shù),而owner是持有鎖進(jìn)程的計(jì)數(shù)值。
實(shí)際場(chǎng)景
一、考慮下面的場(chǎng)景(內(nèi)核搶占場(chǎng)景):
(1)進(jìn)程A在某個(gè)系統(tǒng)調(diào)用過(guò)程中訪問(wèn)了共享資源 R
(2)進(jìn)程B在某個(gè)系統(tǒng)調(diào)用過(guò)程中也訪問(wèn)了共享資源 R
會(huì)不會(huì)造成沖突呢?
假設(shè)在A訪問(wèn)共享資源R的過(guò)程中發(fā)生了中斷,中斷喚醒了沉睡中的,優(yōu)先級(jí)更高的B,在中斷返回現(xiàn)場(chǎng)的時(shí)候,發(fā)生進(jìn)程切換,B啟動(dòng)執(zhí)行,并通過(guò)系統(tǒng)調(diào)用訪問(wèn)了R,如果沒(méi)有鎖保護(hù),則會(huì)出現(xiàn)兩個(gè)thread進(jìn)入臨界區(qū),導(dǎo)致程序執(zhí)行不正確。
OK,我們加上spin lock看看如何:
A在進(jìn)入臨界區(qū)之前獲取了spin lock,同樣的,在A訪問(wèn)共享資源R的過(guò)程中發(fā)生了中斷,中斷喚醒了沉睡中的,優(yōu)先級(jí)更高的B,B在訪問(wèn)臨界區(qū)之前仍然會(huì)試圖獲取spin lock,這時(shí)候由于A進(jìn)程持有spin lock而導(dǎo)致B進(jìn)程進(jìn)入了永久的spin……怎么破?
linux的kernel很簡(jiǎn)單,在A進(jìn)程獲取spin
lock的時(shí)候,禁止本CPU上的搶占(上面的永久spin的場(chǎng)合僅僅在本CPU的進(jìn)程搶占本CPU的當(dāng)前進(jìn)程這樣的場(chǎng)景中發(fā)生)。
如果A和B運(yùn)行在不同的CPU上,那么情況會(huì)簡(jiǎn)單一些:A進(jìn)程雖然持有spin lock而導(dǎo)致B進(jìn)程進(jìn)入spin狀態(tài),不過(guò)由于運(yùn)行在不同的CPU上,A進(jìn)程會(huì)持續(xù)執(zhí)行并會(huì)很快釋放spin lock,解除B進(jìn)程的spin狀態(tài)
二、再考慮下面的場(chǎng)景(中斷上下文場(chǎng)景):
(1)運(yùn)行在CPU0上的進(jìn)程A在某個(gè)系統(tǒng)調(diào)用過(guò)程中訪問(wèn)了共享資源 R
(2)運(yùn)行在CPU1上的進(jìn)程B在某個(gè)系統(tǒng)調(diào)用過(guò)程中也訪問(wèn)了共享資源 R
(3)外設(shè)P的中斷handler中也會(huì)訪問(wèn)共享資源 R
在這樣的場(chǎng)景下,使用spin lock可以保護(hù)訪問(wèn)共享資源R的臨界區(qū)嗎?
我們假設(shè)CPU0上的進(jìn)程A持有spin
lock進(jìn)入臨界區(qū),這時(shí)候,外設(shè)P發(fā)生了中斷事件,并且調(diào)度到了CPU1上執(zhí)行,看起來(lái)沒(méi)有什么問(wèn)題,執(zhí)行在CPU1上的handler會(huì)稍微等待一會(huì)CPU0上的進(jìn)程A,
等它立刻臨界區(qū)就會(huì)釋放spin lock的,但是,如果外設(shè)P的中斷事件被調(diào)度到了CPU0上執(zhí)行會(huì)怎么樣?
CPU0上的進(jìn)程A在持有spin lock的狀態(tài)下被中斷上下文搶占,而搶占它的CPU0上的handler在進(jìn)入臨界區(qū)之前仍然會(huì)試圖獲取spin lock,
悲劇發(fā)生了,CPU0上的P外設(shè)的中斷handler永遠(yuǎn)的進(jìn)入spin狀態(tài),這時(shí)候,CPU1上的進(jìn)程B也不可避免在試圖持有spin lock的時(shí)候失敗而導(dǎo)致進(jìn)入spin狀態(tài)。
為了解決這樣的問(wèn)題,linux kernel采用了這樣的辦法:如果涉及到中斷上下文的訪問(wèn),spin lock需要和禁止本 CPU 上的中斷聯(lián)合使用。
三、再考慮下面的場(chǎng)景(底半部場(chǎng)景)
linux kernel中提供了豐富的bottom half的機(jī)制,雖然同屬中斷上下文,不過(guò)還是稍有不同。
我們可以把上面的場(chǎng)景簡(jiǎn)單修改一下:
外設(shè)P不是中斷handler中訪問(wèn)共享資源R,而是在的bottom half中訪問(wèn)。
使用spin lock+禁止本地中斷當(dāng)然是可以達(dá)到保護(hù)共享資源的效果,但是使用牛刀來(lái)殺雞似乎有點(diǎn)小題大做,這時(shí)候disable bottom half就OK了
四、中斷上下文之間的競(jìng)爭(zhēng)
同一種中斷handler之間在uni core和multi core上都不會(huì)并行執(zhí)行,這是linux kernel的特性。
如果不同中斷handler需要使用spin lock保護(hù)共享資源,對(duì)于新的內(nèi)核(不區(qū)分fast handler和slow handler),所有handler都是關(guān)閉中斷的,因此使用spin lock不需要關(guān)閉中斷的配合。
bottom
half又分成softirq和tasklet,同一種softirq會(huì)在不同的CPU上并發(fā)執(zhí)行,因此如果某個(gè)驅(qū)動(dòng)中的softirq的handler中會(huì)訪問(wèn)某個(gè)全局變量,
對(duì)該全局變量是需要使用spin lock保護(hù)的,不用配合disable CPU中斷或者bottom half。tasklet更簡(jiǎn)單,因?yàn)橥环Ntasklet不會(huì)多個(gè)CPU上并發(fā)。
實(shí)現(xiàn)
我們首先定義描述自旋鎖的結(jié)構(gòu)體arch_spinlock_t。
typedef?struct?{
???union?{
?????????unsigned?int?slock;
?????????struct?__raw_tickets?{
???????????????unsigned?short?owner;?????????
???????????????unsigned?short?next;
???????}?tickets;
??};?
}?arch_spinlock_t;
如上面的原理描述,我們需要兩個(gè)計(jì)數(shù),分別是owner和next。
slock所占內(nèi)存區(qū)域覆蓋owner和next(據(jù)說(shuō)C語(yǔ)言學(xué)好的都能看得懂)。
下面實(shí)現(xiàn)申請(qǐng)鎖操作 arch_spin_lock。
static?inline?void?arch_spin_lock(arch_spinlock_t?*lock)?{
????????arch_spinlock_t?old_lock;
????????old_lock.slock?=?lock->slock;?????????????????????????????????/*?1?*/?
????????lock->tickets.next++;?????????????????????????????????????????/*?2?*/???????????
????????while?(old_lock.tickets.next?!=?old_lock.tickets.owner)?{?????/*?3?*/
????????????old_lock.tickets.owner?=?lock->tickets.owner;????????????/*?4?*/????????
????????}?
}
- 繼續(xù)上面的舉例。顧客從取號(hào)機(jī)器得到排隊(duì)號(hào)。取號(hào)機(jī)器更新下個(gè)顧客將要拿到的排隊(duì)號(hào)??匆幌嘛@示屏,判斷是否輪到自己了。一直盯著顯示屏對(duì)比是不是輪到自己了。
釋放鎖的操作就非常簡(jiǎn)單了。還記得上面銀行辦理業(yè)務(wù)的例子嗎?
釋放鎖的操作僅僅是顯示屏上面的排隊(duì)號(hào)加1。我們僅僅需要將owner計(jì)數(shù)加1即可。arch_spin_unlock實(shí)現(xiàn)如下。
static?inline?void?arch_spin_unlock(arch_spinlock_t?*lock)?{
?????????lock->tickets.owner++;?
}
信號(hào)量(semaphore)
信號(hào)量又稱為信號(hào)燈,它是用來(lái)協(xié)調(diào)不同進(jìn)程間的數(shù)據(jù)對(duì)象的,而最主要的應(yīng)用是共享內(nèi)存方式的進(jìn)程間通信。本質(zhì)上,信號(hào)量是一個(gè)計(jì)數(shù)器,它用來(lái)記錄對(duì)某個(gè)資源(如共享內(nèi)存)的存取狀況。
它負(fù)責(zé)協(xié)調(diào)各個(gè)進(jìn)程,以保證他們能夠正確、合理的使用公共資源。它和spin lock最大的不同之處就是:無(wú)法獲取信號(hào)量的進(jìn)程可以睡眠,因此會(huì)導(dǎo)致系統(tǒng)調(diào)度。一般說(shuō)來(lái),為了獲得共享資源,進(jìn)程需要執(zhí)行下列操作:
(1) 測(cè)試控制該資源的信號(hào)量。
(2) 若此信號(hào)量的值為正,則允許進(jìn)行使用該資源。進(jìn)程將信號(hào)量減1。
(3) 若此信號(hào)量為0,則該資源目前不可用,進(jìn)程進(jìn)入睡眠狀態(tài),直至信號(hào)量值大于0,進(jìn)程被喚醒,轉(zhuǎn)入步驟(1)。
(4) 當(dāng)進(jìn)程不再使用一個(gè)信號(hào)量控制的資源時(shí),信號(hào)量值加1。如果此時(shí)有進(jìn)程正在睡眠等待此信號(hào)量,則喚醒此進(jìn)程。
維護(hù)信號(hào)量狀態(tài)的是Linux內(nèi)核操作系統(tǒng)而不是用戶進(jìn)程。
我們可以從頭文件/usr/src/linux/include/linux/sem.h 中看到內(nèi)核用來(lái)維護(hù)信號(hào)量狀態(tài)的各個(gè)結(jié)構(gòu)的定義。信號(hào)量是一個(gè)數(shù)據(jù)集合,用戶可以單獨(dú)使用這一集合的每個(gè)元素。
信號(hào)量(semaphore)是進(jìn)程間通信處理同步互斥的機(jī)制。是在多線程環(huán)境下使用的一種措施,它負(fù)責(zé)協(xié)調(diào)各個(gè)進(jìn)程,以保證他們能夠正確、合理的使用公共資源。它和spin lock最大的不同之處就是:無(wú)法獲取信號(hào)量的進(jìn)程可以睡眠,因此會(huì)導(dǎo)致系統(tǒng)調(diào)度。
原理
信號(hào)量一般可以用來(lái)標(biāo)記可用資源的個(gè)數(shù)。
舉2個(gè)生活中的例子:
- 我們要坐火車從南京到新疆,這個(gè)'任務(wù)'特別的耗時(shí),只能在車上等著車到站,但是我們沒(méi)有必要一直睜著眼睛等著車到站,最好的情況就是我們上車就直接睡覺(jué),醒來(lái)就到站,這樣從人(用戶)的角度來(lái)說(shuō),體驗(yàn)是最好的,對(duì)比于進(jìn)程,程序在等待一個(gè)耗時(shí)的任務(wù)的時(shí)候,沒(méi)有必須要占用CPU,可以暫停當(dāng)前任務(wù)使其進(jìn)入休眠狀態(tài),當(dāng)?shù)却氖录l(fā)生之后再由其他任務(wù)喚醒,這種場(chǎng)景采用信號(hào)量比較合適。我們?cè)诘却娞荨⒌却词珠g,這種場(chǎng)景需要等待的事件并不是很多,如果我們還要找個(gè)地方睡一覺(jué),然后等電梯到了或者洗手間可以用了再醒來(lái),那很顯然這也沒(méi)有必要,我們只需要排好隊(duì),刷一刷抖音就可以了,對(duì)比于計(jì)算機(jī)程序,比如驅(qū)動(dòng)在進(jìn)入中斷例程,在等待某個(gè)寄存器被置位,這種場(chǎng)景需要等待的時(shí)間很短暫,系統(tǒng)開銷遠(yuǎn)小于進(jìn)入休眠的開銷,所以這種場(chǎng)景采用自旋鎖比較合適。
實(shí)現(xiàn)
為了記錄可用資源的數(shù)量,我們肯定需要一個(gè)count計(jì)數(shù),標(biāo)記當(dāng)前可用資源數(shù)量。當(dāng)然還要一個(gè)可以像圖書管理員一樣的筆記本功能。
用來(lái)記錄等待借書的同學(xué)。所以,一個(gè)雙向鏈表即可。因此只需要一個(gè)count計(jì)數(shù)和等待進(jìn)程的鏈表頭即可。描述信號(hào)量的結(jié)構(gòu)體如下。
struct?semaphore?{
????unsigned?int?count;
????struct?list_head?wait_list;?
};
在Linux中,每個(gè)進(jìn)程就相當(dāng)于是每個(gè)借書的同學(xué)。
通知一個(gè)同學(xué),就相當(dāng)于喚醒這個(gè)進(jìn)程。因此,我們還需要一個(gè)結(jié)構(gòu)體記錄當(dāng)前的進(jìn)程信息(task_struct)。
struct?semaphore_waiter?{
????struct?list_head?list;
????struct?task_struct?*task;?
};
struct semaphore_waiter的list成員是當(dāng)進(jìn)程無(wú)法獲取信號(hào)量的時(shí)候掛入semaphore的wait_list成員。task成員就是記錄后續(xù)被喚醒的進(jìn)程信息。
將當(dāng)前進(jìn)程賦給task,并利用其list成員將該變量的節(jié)點(diǎn)加入到以sem中的wait_list為頭部的一個(gè)列表中,假設(shè)有多個(gè)進(jìn)程在sem上調(diào)用down_interruptible,則sem的wait_list上形成的隊(duì)列如下圖:
(注:將一個(gè)進(jìn)程阻塞,一般的經(jīng)過(guò)是先把進(jìn)程放到等待隊(duì)列中,接著改變進(jìn)程的狀態(tài),比如設(shè)為TASK_INTERRUPTIBLE,然后調(diào)用調(diào)度函數(shù)schedule(),后者將會(huì)把當(dāng)前進(jìn)程從cpu的運(yùn)行隊(duì)列中摘下)
一切準(zhǔn)備就緒,現(xiàn)在就可以實(shí)現(xiàn)信號(hào)量的申請(qǐng)函數(shù)。
void?down(struct?semaphore?*sem)?{
?????struct?semaphore_waiter?waiter;
?????if?(sem->count?>?0)?{
?????????sem->count--;??/*?1?*/
?????????return;????????????????????????????????????
?????}
?????waiter.task?=?current;??????????????????????????/*?2?*/
?????list_add_tail(&waiter.list,?&sem->wait_list);???/*?2?*/
?????schedule();?????????????????????????????????????/*?3?*/
}
- 如果信號(hào)量標(biāo)記的資源還有剩余,自然可以成功獲取信號(hào)量。只需要遞減可用資源計(jì)數(shù)。既然無(wú)法獲取信號(hào)量,就需要將當(dāng)前進(jìn)程掛入信號(hào)量的等待隊(duì)列鏈表上。schedule()主要是觸發(fā)任務(wù)調(diào)度的示意函數(shù),主動(dòng)讓出CPU使用權(quán)。在讓出之前,需要將當(dāng)前進(jìn)程從運(yùn)行隊(duì)列上移除。
釋放信號(hào)的實(shí)現(xiàn)也是比較簡(jiǎn)單。實(shí)現(xiàn)如下。
void?up(struct?semaphore?*sem)?{
????struct?semaphore_waiter?waiter;
????if?(list_empty(&sem->wait_list))?{
???????sem->count++;??????????????????????????/*?1?*/
???????return;?
????}
????waiter?=?list_first_entry(&sem->wait_list,?struct?semaphore_waiter,?list);?
????list_del(&waiter->list);??????????????????/*?2?*/
????wake_up_process(waiter->task);????????????/*?2?*/
}
- 如果等待鏈表沒(méi)有進(jìn)程,那么自然只需要增加資源計(jì)數(shù)。從等待進(jìn)程鏈表頭取出第一個(gè)進(jìn)程,并從鏈表上移除。然后就是喚醒該進(jìn)程。
讀寫鎖(read-write lock)
不管是自旋鎖還是信號(hào)量在同一時(shí)間只能有一個(gè)進(jìn)程進(jìn)入臨界區(qū)。對(duì)于有些情況,我們是可以區(qū)分讀寫操作的。因此,我們希望對(duì)于讀操作的進(jìn)程可以并發(fā)進(jìn)行。對(duì)于寫操作只限于一個(gè)進(jìn)程進(jìn)入臨界區(qū)。而這種同步機(jī)制就是讀寫鎖。讀寫鎖一般具有以下幾種性質(zhì)。
- 同一時(shí)間有且僅有一個(gè)寫進(jìn)程進(jìn)入臨界區(qū)。在沒(méi)有寫進(jìn)程進(jìn)入臨界區(qū)的時(shí)候,同時(shí)可以有多個(gè)讀進(jìn)程進(jìn)入臨界區(qū)。讀進(jìn)程和寫進(jìn)程不可以同時(shí)進(jìn)入臨界區(qū)。
讀寫鎖有兩種,一種是信號(hào)量類型,另一種是spin lock類型。下面以spin lock類型講解。
原理
下面我們用廁所模型來(lái)理解讀寫鎖。
我發(fā)現(xiàn)公司一般都會(huì)有保潔阿姨打掃廁所。如果以男廁所為例的話,我覺(jué)得男士進(jìn)入廁所就相當(dāng)于讀者進(jìn)入臨界區(qū)。因?yàn)榭梢杂卸鄠€(gè)男士進(jìn)廁所。而保潔阿姨進(jìn)入男士廁所就相當(dāng)于寫者進(jìn)入臨界區(qū)。
假設(shè)A男士發(fā)現(xiàn)保潔阿姨不在打掃廁所,就進(jìn)入廁所。隨后B和C同時(shí)也進(jìn)入廁所。
然后保潔阿姨準(zhǔn)備打掃廁所,發(fā)現(xiàn)有男士在廁所里面,因此只能在門口等待。
ABC都離開了廁所。保潔阿姨迅速進(jìn)入廁所打掃。然后D男士去上廁所,發(fā)現(xiàn)保潔阿姨在里面?;伊锪锏某鰜?lái)了在門口等著?,F(xiàn)在體會(huì)到了寫者(保潔阿姨)具有排他性,讀者(男士)可以并發(fā)進(jìn)入臨界區(qū)了吧。
既然我們?cè)试S多個(gè)讀者進(jìn)入臨界區(qū),因此我們需要一個(gè)計(jì)數(shù)統(tǒng)計(jì)讀者的個(gè)數(shù)。同時(shí),由于寫者永遠(yuǎn)只存在一個(gè)進(jìn)入臨界區(qū),因此只需要一個(gè)bit標(biāo)記是否有寫進(jìn)程進(jìn)入臨界區(qū)。
所以,我們可以將兩個(gè)計(jì)數(shù)合二為一。只需要1個(gè)unsigned
int類型即可。最高位(bit31)代表是否有寫者進(jìn)入臨界區(qū),低31位(0~30bit)統(tǒng)計(jì)讀者個(gè)數(shù)。
在這里插入圖片描述
實(shí)現(xiàn)
描述讀寫鎖只需要1個(gè)變量即可,因此我們可以定義讀寫鎖的結(jié)構(gòu)體如下。
typedef?struct?{
????????volatile?unsigned?int?lock;
}?arch_rwlock_t;
既然區(qū)分讀寫操作,因此肯定會(huì)有兩個(gè)申請(qǐng)鎖函數(shù),分別是讀和寫。首先,我們看一下read_lock操作的實(shí)現(xiàn)。
static?inline?void?arch_read_lock(arch_rwlock_t?*rw)
{
????????unsigned?int?tmp;
????????do?{
????????????????tmp?=?rw->lock;
????????????????tmp++;????????????????????????????/*?1?*/
????????}?while(tmp?&?(1?<<?31));?????????????????/*?2?*/
????????rw->lock?=?tmp;
}
- 增加讀者計(jì)數(shù),最后會(huì)更新到rw->lock中。更新rw->lock前提是沒(méi)有寫者,因此這里會(huì)判斷是否有寫者已經(jīng)進(jìn)入臨界區(qū)(判斷方法是rw->lock變量bit 31的值)。如果,有寫者已經(jīng)進(jìn)入臨界區(qū),就在這里循環(huán)。一直等到寫者離開。
當(dāng)讀進(jìn)程離開臨界區(qū)的時(shí)候會(huì)調(diào)用read_unlock釋放鎖。read_unlock實(shí)現(xiàn)如下。
static?inline?void?arch_read_unlock(arch_rwlock_t?*rw)
{
????????rw->lock--;
}
遞減讀者計(jì)數(shù)即可。
讀操作看完了,我們看看寫操作是如何實(shí)現(xiàn)的。arch_write_lock實(shí)現(xiàn)如下。
static?inline?void?arch_write_lock(arch_rwlock_t?*rw)
{
????????unsigned?int?tmp;
????????do?{
????????????????tmp?=?rw->lock;
????????}?while(tmp);???????????????????????/*?1?*/
????????rw->lock?=?1?<<?31;?????????????????/*?2?*/
}
- 由于寫者是排他的(讀者和寫者都不能有),因此這里只有rw->lock的值為0,當(dāng)前的寫者才可以進(jìn)入臨界區(qū)。置位rw->lock的bit31,代表有寫者進(jìn)入臨界區(qū)。
當(dāng)寫進(jìn)程離開臨界區(qū)的時(shí)候會(huì)調(diào)用write_unlock釋放鎖。write_unlock實(shí)現(xiàn)如下。
static?inline?void?arch_write_unlock(arch_rwlock_t?*rw)
{
????????rw->lock?=?0;
}
同樣由于寫者是排他的,因此只需要將rw->lock置0即可。代表沒(méi)有任何進(jìn)程進(jìn)入臨界區(qū)。
畢竟是因?yàn)橥粫r(shí)間只能有一個(gè)寫者進(jìn)入臨界區(qū),當(dāng)這個(gè)寫者離開臨界區(qū)的時(shí)候,肯定是意味著現(xiàn)在沒(méi)有任何進(jìn)程進(jìn)入臨界區(qū)。
以上的代碼實(shí)現(xiàn)其實(shí)會(huì)導(dǎo)致寫進(jìn)程餓死現(xiàn)象。
例如,A、B、C三個(gè)進(jìn)程進(jìn)入讀臨界區(qū),D進(jìn)程嘗試獲得寫鎖,此時(shí)只能等待A、B、C三個(gè)進(jìn)程退出臨界區(qū)。如果在推出之前又有F、G進(jìn)程進(jìn)入讀臨界區(qū),那么將出現(xiàn)D進(jìn)程餓死現(xiàn)象。
互斥體(mutex)
互斥體實(shí)現(xiàn)了“互相排斥”(mutual exclusion)同步的簡(jiǎn)單形式(所以名為互斥體(mutex))。
互斥體禁止多個(gè)線程同時(shí)進(jìn)入受保護(hù)的代碼“臨界區(qū)”(critical section)。因此,在任意時(shí)刻,只有一個(gè)線程被允許進(jìn)入這樣的代碼保護(hù)區(qū)。
任何線程在進(jìn)入臨界區(qū)之前,必須獲取(acquire)與此區(qū)域相關(guān)聯(lián)的互斥體的所有權(quán)。
如果已有另一線程擁有了臨界區(qū)的互斥體,其他線程就不能再進(jìn)入其中。這些線程必須等待,直到當(dāng)前的屬主線程釋放(release)該互斥體。
什么時(shí)候需要使用互斥體呢?
互斥體用于保護(hù)共享的易變代碼,也就是,全局或靜態(tài)數(shù)據(jù)。這樣的數(shù)據(jù)必須通過(guò)互斥體進(jìn)行保護(hù),以防止它們?cè)诙鄠€(gè)線程同時(shí)訪問(wèn)時(shí)損壞。
前文提到的semaphore在初始化count計(jì)數(shù)的時(shí)候,可以分為計(jì)數(shù)信號(hào)量和互斥信號(hào)量(二值信號(hào)量)。
mutex和初始化計(jì)數(shù)為1的二值信號(hào)量有很大的相似之處。他們都可以用做資源互斥。但是mutex卻有一個(gè)特殊的地方:只有持鎖者才能解鎖。
但是,二值信號(hào)量卻可以在一個(gè)進(jìn)程中獲取信號(hào)量,在另一個(gè)進(jìn)程中釋放信號(hào)量。如果是應(yīng)用在嵌入式應(yīng)用的RTOS,針對(duì)mutex的實(shí)現(xiàn)還會(huì)考慮優(yōu)先級(jí)反轉(zhuǎn)問(wèn)題。
原理
既然mutex是一種二值信號(hào)量,因此就不需要像semaphore那樣需要一個(gè)count計(jì)數(shù)。
由于mutex具有“持鎖者才能解鎖”的特點(diǎn),所以我們需要一個(gè)變量owner記錄持鎖進(jìn)程。釋放鎖的時(shí)候必須是同一個(gè)進(jìn)程才能釋放。
當(dāng)然也需要一個(gè)鏈表頭,主要用來(lái)便利睡眠等待的進(jìn)程。原理和semaphore及其相似,因此在代碼上也有體現(xiàn)。
實(shí)現(xiàn)
mutex的實(shí)現(xiàn)代碼和Linux中實(shí)現(xiàn)會(huì)有差異,但是依然可以為你呈現(xiàn)設(shè)計(jì)的原理。
下面的設(shè)計(jì)代碼更像是部分RTOS中的代碼。mutex和semaphore一樣,我們需要兩個(gè)類似的結(jié)構(gòu)體分別描述mutex。
struct?mutex_waiter?{
????????struct?list_head???list;
????????struct?task_struct?*task;
};
struct?mutex?{
????long???owner;
????struct?list_head?wait_list;
};
struct mutex_waiter的list成員是當(dāng)進(jìn)程無(wú)法獲取互斥量的時(shí)候掛入mutex的wait_list鏈表。
首先實(shí)現(xiàn)申請(qǐng)互斥量的函數(shù)。
void?mutex_take(struct?mutex?*mutex)
{
????????struct?mutex_waiter?waiter;
????????if?(!mutex->owner)?{
????????????????mutex->owner?=?(long)current;???????????/*?1?*/
????????????????return;
????????}
????????waiter.task?=?current;
????????list_add_tail(&waiter.list,?&mutex->wait_list);?/*?2?*/
????????schedule();?????????????????????????????????????/*?2?*/
}
- 當(dāng)mutex->owner的值為0的時(shí)候,代表沒(méi)有任何進(jìn)程持有鎖。因此可以直接申請(qǐng)成功。然后,記錄當(dāng)前申請(qǐng)鎖進(jìn)程的task_struct。既然不能獲取互斥量,自然就需要睡眠等待,掛入等待鏈表。
互斥量的釋放代碼實(shí)現(xiàn)也同樣和semaphore有很多相似之處。
int?mutex_release(struct?mutex?*mutex)
{
????????struct?mutex_waiter?*waiter;
????????if?(mutex->owner?!=?(long)current)?????????????????????????/*?1?*/
????????????????return?-1;
????????if?(list_empty(&mutex->wait_list))?{
????????????????mutex->owner?=?0;??????????????????????????????????/*?2?*/
????????????????return?0;
????????}
????????waiter?=?list_first_entry(&mutex->wait_list,?struct?mutex_waiter,?list);
????????list_del(&waiter->list);
????????mutex->owner?=?(long)waiter->task;?????????????????????????/*?3?*/
????????wake_up_process(waiter->task);?????????????????????????????/*?4?*/
????????return?0;
}?
- mutex具有“持鎖者才能解鎖”的特點(diǎn)就是在這行代碼體現(xiàn)。如果等待鏈表沒(méi)有進(jìn)程,那么自然只需要將mutex->owner置0,代表沒(méi)有鎖是釋放狀態(tài)。mutex->owner的值改成當(dāng)前可以持鎖進(jìn)程的task_struct。從等待進(jìn)程鏈表取出第一個(gè)進(jìn)程,并從鏈表上移除。然后就是喚醒該進(jìn)程。
end