文章底部有推薦有關(guān)嵌入式的學習教程,韋老師良心之作,積累項目經(jīng)驗,業(yè)界口碑非常好。
所謂的內(nèi)核同步就是對共享資源進行保護,防止并發(fā)訪問。
如果有多個執(zhí)行線程(指任何正在執(zhí)行的代碼實例,比如一個在內(nèi)核執(zhí)行的進程,
一個中斷處理程序,或一個內(nèi)核線程)同時訪問和操作共享的數(shù)據(jù),
就有可能造成進程之間互相覆蓋共享數(shù)據(jù),造成被訪問數(shù)據(jù)處于不一致的情況。
這種錯誤很難跟蹤和調(diào)試,但非常重要。
要做到對共享資源的恰當保護是很困難的。
a.linux2.0以前的時代
在多年前,linux還沒有支持對稱多處理器SMP的時候,避免并發(fā)數(shù)據(jù)訪問相對簡單。
在單處理器的時候,只有在中斷發(fā)生的時候,或是在內(nèi)核代碼顯式地請求重新調(diào)度(調(diào)用schedule())時,數(shù)據(jù)才可能被并發(fā)訪問。
b.linux2.0以后的時代
從2.0開始,linux開始支持SMP.
此時如果不加保護,運行在兩個不同處理器上的內(nèi)核代碼完全可能在同一時刻并發(fā)訪問共享數(shù)據(jù)。
到2.6時,linux已經(jīng)發(fā)展成搶占式內(nèi)核,
在不加保護的時候,調(diào)度程序可以在任何時刻搶占正在運行的內(nèi)核代碼,重新調(diào)度其他的進程運行。
隨著內(nèi)核的發(fā)展,共享數(shù)據(jù)的保護已經(jīng)成為驅(qū)動開發(fā)中最困難的問題之一,而這些問題在用戶應用程序中并不常見。
(2)臨界區(qū)和競爭條件
a.臨界區(qū)(critical region)
所謂臨界區(qū)就是訪問和操作共享數(shù)據(jù)的代碼段。
因為多個執(zhí)行線程并發(fā)訪問同一個資源通常是不安全的,所以程序員必須保證這些代碼段原子地執(zhí)行,
也就是說,代碼在執(zhí)行前不可被打斷,就如同整個臨界區(qū)是一個不可分割的指令一樣。
b.競爭條件(race condition)
如果發(fā)生了兩個執(zhí)行線程處于同一個臨界區(qū)的情況,我們稱這就是一個競爭條件。
這是程序包含的一個bug。競爭引起的錯誤很難重現(xiàn),所以非常難調(diào)試。
c.同步(synchronization)
避免并發(fā)和防止競爭條件就被稱為同步。
d.單個變量形成的臨界區(qū)
一個非常簡單的共享資源:
int i; /* 共享資源,全局整型變量 */
...
i++; /* 臨界區(qū),對共享資源的操作 */
這個臨界區(qū)可以轉(zhuǎn)化成下面的機器指令:
*得到當前變量i的值并且拷貝到一個寄存器
*將寄存器中的值加1
*把i的新值寫回到內(nèi)存中
如果有兩個執(zhí)行線程同時進入臨界區(qū),
假設(shè)i的初始值為7,我們期望的結(jié)果如下:
線程1 線程2
獲得i(7) -
增加i(7->8) -
寫回i(8) -
獲得i(8)
增加i(8->9)
寫回i(9)
實際的執(zhí)行結(jié)果卻可能是:
線程1 線程2
獲得i(7) -
- 獲得i(7)
增加i(7->8) -
- 增加i(7->8)
寫回i(8) -
- 寫回i(8)
e.如何處理上面的臨界區(qū)
這種是最簡單的競爭條件,只要把操作指令變成原子的就可以了。
多數(shù)處理器都提供了指令來原子地讀變量,增加變量然后再寫回變量。
如果使用原子變量,唯一可能的結(jié)果就是:
線程1 線程2
增加i(7->8) -
- 增加i(8->9)
或者是:
線程1 線程2
- 增加i(7->8)
增加i(8->9) -
兩個原子操作交錯執(zhí)行更本就不可能發(fā)生,
因為處理器會從物理上確保這種不可能。
(3)加鎖
當涉及到對數(shù)據(jù)結(jié)構(gòu)的操作時,比如對鏈表的處理時,
就不可能僅通過原子指令來保證同步,此時,需要一種鎖機制。
程序中的鎖機制就像日常生活中的門鎖,門后的房間就是臨界區(qū)。
房間中同一時間只能有一個線程。
任何要訪問隊列的代碼首先要占住相應的鎖,這樣該鎖就能阻止來自其他執(zhí)行線程的并發(fā)訪問:
線程1 線程2
試圖鎖定隊列 試圖鎖定隊列
成功:獲得鎖 失?。旱却?..
訪問隊列... 等待...
為隊列解除鎖 等待...
... 成功:獲得鎖
訪問隊列...
為隊列解除鎖
鎖的使用是自愿的,非強制的,它完全屬于一種編程者自選的編程手段。
鎖有多種形式,而且加鎖的粒度范圍也各不相同。
linux實現(xiàn)了幾種不同的鎖機制,
各種鎖機制之間的區(qū)別主要在于當鎖被爭用時的行為:
一些鎖被爭用時會簡單地進行忙等待(spinlock)
一些鎖會使當前任務睡眠直到鎖可用為止(semaphore)
(4)是什么造成了并發(fā)執(zhí)行
a.用戶空間的并發(fā)原因
用戶空間之所以需要同步,是因為用戶程序會被調(diào)度程序搶占和重新調(diào)度。
在單cpu上,并發(fā)操作并不是真的同時發(fā)生,
而是交錯執(zhí)行,稱為偽并發(fā)。
如果是SMP系統(tǒng),兩個進程就可以真正在臨界區(qū)中同時執(zhí)行了。這稱為真并發(fā)。
用戶空間可能產(chǎn)生并發(fā)的地方有:
*共享內(nèi)存
同一個進程的兩個可執(zhí)行線程,訪問共享的內(nèi)存時可能因為被調(diào)度程序搶占后發(fā)生重新調(diào)度而并發(fā)
*信號
信號處理是異步的,如果信號處理程序和進程的其他部分共享數(shù)據(jù),則有可能并發(fā)
b.內(nèi)核空間的并發(fā)原因
*中斷
中斷幾乎可以在任何時刻異步發(fā)生,也就可能隨時打斷當前正在執(zhí)行的代碼
*軟中斷和tasklet
內(nèi)核能在任何時刻喚醒或調(diào)度軟中斷和tasklet,打斷當前正在執(zhí)行的代碼
*內(nèi)核搶占
因為內(nèi)核具有搶占性,所以內(nèi)核中的任務可能被另一個任務搶占
*睡眠及與用戶空間的同步
睡眠可能導致用戶進程切換
*對稱多處理SMP
多個處理器可以同時執(zhí)行代碼
內(nèi)核開發(fā)者必須理解這些并發(fā)執(zhí)行的原因,并為其做好準備。
注意!辨認出真正需要共享的數(shù)據(jù)和相應的臨界區(qū),才是真正有挑戰(zhàn)性的地方。
在編寫代碼的開始階段就要設(shè)計出恰當?shù)逆i,而不是事后才想到。
(5)要保護些什么
找出哪些數(shù)據(jù)需要保護是關(guān)鍵所在。
基本上除了執(zhí)行線程的內(nèi)部數(shù)據(jù)不需要鎖,其他所有的數(shù)據(jù)都可能需要鎖保護。
加鎖經(jīng)驗:
如果有其他執(zhí)行線程可以訪問某些數(shù)據(jù),那么就給這些數(shù)據(jù)加上某種形式的鎖。
記??!要給數(shù)據(jù)而不是代碼加鎖。
內(nèi)核的配置選項
CONFIG_SMP
可以選擇不支持smp。許多加鎖問題在單處理器上是不存在的。
如果在內(nèi)核配置時選擇了CONFIG_SMP,則不必要的代碼就不會被編入針對單處理器的內(nèi)核映像
CONFIG_PREEMPT
配置內(nèi)核是否支持搶占。
在你編寫內(nèi)核代碼時,要問自己下面這些問題:
a.這個數(shù)據(jù)是不是全局的?
除了當前的線程外,其他線程能不能訪問它?
b.這個數(shù)據(jù)會不會在進程上下文和中斷上下文中共享?
它是不是要在兩個不同的中斷處理程序中共享?
c.進程在訪問數(shù)據(jù)時可不可能被搶占?
被調(diào)度的新程序會不會訪問同一數(shù)據(jù)?
d.當前進程是不是會睡眠(阻塞)在某些資源上?
如果是,它會讓共享數(shù)據(jù)處于何種狀態(tài)?
e.如果這個函數(shù)又在另一個處理器上被調(diào)度將會發(fā)生什么?
f.你要對這些代碼做什么?
簡言之,幾乎訪問所有的內(nèi)核全局變量和共享數(shù)據(jù)都需要某種形式的同步方法。
(6)死鎖
死鎖的產(chǎn)生需要一定的條件:
需要一個或多個執(zhí)行線程和一個或多個資源,每個線程都在等待某個已經(jīng)被占用的資源。
例如交通路口的擁堵。
a.自死鎖
如果代碼已經(jīng)獲得了某個鎖,又再次去獲得它,就會造成自死鎖。
如:
線程1
獲得鎖
再次試圖獲得鎖
等待鎖重新可用
b.ABBA死鎖
線程1 線程2
獲得A鎖 獲得B鎖
試圖獲得B鎖 試圖獲得A鎖
等待B鎖 等待A鎖
c.避免死鎖的簡單規(guī)則
*加鎖的順序是關(guān)鍵
使用嵌套的鎖時必須保證以相同的順序獲得鎖。
比如上面的ABBA死鎖,如果所有進程都按照先獲得A鎖再獲得B鎖的順序,就不會死鎖了
*不要重復請求同一個鎖
*越復雜的加鎖方案越有可能造成死鎖--設(shè)計應力求簡單
*盡管釋放鎖的順序和死鎖是無關(guān)的,但最好還是以獲得鎖的相反順序來釋放鎖。
/*********************
* 內(nèi)核同步方法
********************/
前面討論了競爭如何產(chǎn)生以及怎么去解決。
下面將介紹linux為解決競爭問題而提供的同步方法
(1)原子整數(shù)操作
原子操作可以保證指令以原子的方式運行--執(zhí)行過程不能被打斷。
原子操作把讀取和改變變量的行為包含在一個單步中執(zhí)行,
從而避免了競爭,如:
線程1 線程2
atomic i(7->8) -
- atomic i(8->9)
內(nèi)核提供了針對整數(shù)和單獨的位進行的原子操作。
針對整數(shù)的原子操作只能對atomic_t類型的數(shù)據(jù)進行訪問。
定義在<asm/atomic.h>
typedef struct { volatile int counter; } atomic_t;
a.定義
atomic_t t;
atomic_t u = ATOMIC_INIT(0); /* 定義u并初始化為0 */
b.操作
都定義在<asm/atomic.h>中。
原子操作通常是內(nèi)聯(lián)函數(shù),往往通過內(nèi)嵌匯編指令來實現(xiàn)。
int atomic_read(atomic_t v);
原子地讀取變量v
void atomic_set(atomic_t *v, int i); // v = i;
void atomic_add(int i, atomic_t *v); // v += i;
void atomic_sub(int i, atomic_t *v); // v -= i;
void atomic_inc(atomic_t *v); // v += 1;
void atomic_dec(atomic_t *v); // v -= 1;
原子地加減1
int atomic_sub_and_test(int i, atomic_t *v)
原子地從v減去i,如果結(jié)果等于0返回真,否則返回假
int atomic_add_negative(int i, atomic_t *v)
結(jié)果為負數(shù)返回真,否則返回假
int atomic_dec_and_test(atomic_t *v); // i -= 1; i == 0;
int atomic_inc_and_test(atomic_t *v); // i += 1; i == 0;
結(jié)果等于0返回真,否則返回假
(2)原子位操作
定義在<asm/bitops.h>。這些函數(shù)針對某個普通的內(nèi)存地址。
可用的位操作:
void set_bit(nr, void *addr);
設(shè)置addr指向的數(shù)據(jù)項的第nr位
*addr |= (0x1 << nr);
void clear_bit(nr, void *addr);
清除addr指向的數(shù)據(jù)項的第nr位
*addr &= ~(0x1 << nr);
test_bit(nr, void *addr);
返回指定位的當前值
*addr & (0x1 << nr);
int test_and_set_bit(nr, void *addr);
設(shè)置nr位的同時返回原來的值
例子:
unsigned long word = 0;
set_bit(0, &word); /* 原子地設(shè)定第0位 */
set_bit(1, &word); /* 原子地設(shè)定第1位 */
clear_bit(1, &word); /* 原子地清空第1位 */
change_bit(0, &word); /* 原子地翻轉(zhuǎn)第0位 */
/* 原子地設(shè)置第0位并返回設(shè)置前的值(0) */
if(test_and_set_bit(0, &word){
/* 不為真 */
}
word = 7; /* 合法 */
內(nèi)核還提供了一組非原子位操作函數(shù),名字前面多了兩個下劃線。如__test_bit()
如果不需要原子性操作,比如已經(jīng)用鎖保護了數(shù)據(jù),
用這些非原子的位操作可能更快。
(3)原子性與順序性的比較
原子性確保指令執(zhí)行期間不被打斷,要么全部執(zhí)行完,要么不執(zhí)行。
順序性確保指令的執(zhí)行順序不改變。通過屏障指令(barrier)來保證。
能使用原子操作的時候,就盡量不要使用復雜的加鎖機制。
(4)spinlock自旋鎖
原子操作只能針對整數(shù)和位,面對更復雜的臨界區(qū),
比如從一個數(shù)據(jù)結(jié)構(gòu)中移出數(shù)據(jù),處理完后再加入到另一個數(shù)據(jù)結(jié)構(gòu)中。
這種情況就需要更復雜的同步方法--鎖來提供保護。
linux內(nèi)核中最常見的鎖是自旋鎖(spin lock)。
自旋鎖最多只能被一個可執(zhí)行線程持有,等待鎖的進程采用忙循環(huán)等待(只針對smp)。
因為忙循環(huán)很消耗處理器時間,所以自旋鎖不能被長時間持有。
持有自旋鎖的時間最好小于完成兩次上下文切換的時間,
如果時間太長,就需要采用另外的加鎖方式。
自旋鎖定義在<asm/spinlock.h>和<linux/spinlock.h>
自旋鎖可以在中斷處理程序中使用。
在這種情況下,其他代碼要獲得鎖時必須首先關(guān)閉中斷。
在單處理器上,加鎖操作只是簡單地把內(nèi)核搶占關(guān)閉。
等待自旋鎖時程序不會睡眠,同樣,一旦擁有了spinlock,代碼也不能睡眠。
此時,必須注意每個調(diào)用的系統(tǒng)函數(shù),因為有些函數(shù)是可能導致睡眠的(如kmallspin_unlock_irqstoreoc)。
可以選定CONFIG_DEBUG_SPINLOCK選項,打開spinlock的調(diào)試功能
a.spinlock的初始化
靜態(tài)
spinlock_t my_lock = SPIN_LOCK_UNLOCKED;
動態(tài)
void spin_lock_init(spinlock_t *lock);
b.加鎖函數(shù)
void spin_lock(spinlock_t *lock);
void spin_lock_irqsave(spinlock_t *lock, unsigned long flags);
void spin_lock_irq(spinlock_t *lock);
void spin_lock_bh(spinlock_t *lock);
如果我們的一個自旋鎖可以被中斷處理程序或者是中斷的下半部獲得,則必須使用禁止中斷的spin_lock形式。
c.解鎖函數(shù)
void spin_unlock(spinlock_t *lock);
void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags);
void spin_unlock_irq(spinlock_t *lock);
void spin_unlock_bh(spinlock_t *lock);
d.非阻塞的自旋鎖操作
int spin_trylock(spinlock_t *lock);
int spin_trylock_bh(spinlock_t *lock);
成功(即獲得鎖)時返回非零,否則返回0
(5)讀取者/寫入者自旋鎖(reader/writer)
有時候,鎖的用途可以明確分為讀取和寫入。比如對鏈表的更新和檢索。
寫操作必須要求并發(fā)保護,但多個并發(fā)的讀操作是安全的。
這樣可以提高鎖的使用效率。定義在<linux/spinlock.h>
a.初始化
靜態(tài)
rwlock_t my_rwlock = RW_LOCK_UNLOCKED;
動態(tài)
rwlock_t my_rwlock;
rwlock_init(&my_rwlock);
b.加鎖
read/write_lock()
read/write_lock_irq()
read/write_lock_irqsave()
read/write_lock_bh()
c.解鎖
read/write_unlock()
read/write_unlock_irq()
read/write_unlock_irqsave()
read/write_unlock_bh()
d.其他
write_trylock()
rw_is_locked()
e.使用方式
rwlock_t my_rwlock = RW_LOCK_UNLOCKED;
讀者
read_lock(&my_rwlock);
/* 臨界區(qū)(只讀) */
read_unlock(&my_rwlock);
寫者
write_lock(&my_rwlock);
/* 臨界區(qū)(讀寫) */
write_unlock(&my_rwlock);
通常,讀鎖和寫鎖位于完全分開的代碼中。不能把一個讀鎖升級為寫鎖。如:
read_lock(&my_rwlock);
...
write_lock(&my_rwlock);
這會造成死鎖。
如果代碼不能清晰分成讀和寫兩部分,那么就用普通的spinlock就可以了。
注意!讀/寫者鎖機制要更照顧讀者。
當讀鎖被持有時,寫者必須等待,直到所有的讀者釋放鎖,而其他的讀者卻可以繼續(xù)獲得鎖。
這樣,大量讀者必定會使刮起的寫者處于饑餓狀態(tài)。
如果加鎖時間不長并且代碼不會睡眠(比如中斷處理程序),利用自旋鎖是最佳方案;
如果加鎖時間很長或者代碼在持有鎖時有可能睡眠,則最好用信號量來完成加鎖。
(6)信號量(semaphore)和互斥體
信號量(semaphore)和信號(signal)是完全不同的概念。
linux的信號量是一種睡眠鎖。
如果一個進程a試圖獲得一個已經(jīng)被占用的信號量時,該進程將被送入一個等待隊列,然后睡眠。
持有信號量的進程b將信號量釋放后,進程a將被喚醒并獲得信號量。
信號量比自旋鎖能提供更好的處理器利用率,但信號量的開銷更大。
信號量的特性
a.信號量適用于鎖會被長期持有時,因為其開銷比較大
b.只有在進程上下文中才能獲得信號量,因為獲取信號量時可能導致睡眠,不適用于中斷上下文
c.可以在持有信號量時去睡眠,因此可以在持有信號量的時候和用戶空間同步
d.不能在持有信號量的同時持有自旋鎖
e.信號量可以允許任意數(shù)量的鎖持有者,而自旋鎖在一個時刻只能有一個持有者。
只擁有一個持有者的信號量稱為互斥信號量
(7)信號量的實現(xiàn)
信號量的實現(xiàn)與體系結(jié)構(gòu)相關(guān),具體實現(xiàn)定義在<asm/semaphore.h>
<linux/semaphore.h>
a.信號量的聲明
靜態(tài)
static DECLARE_SEMAPHORE_GENERIC(name, count);
或聲明一個互斥信號量
static DECLARE_MUTEX(name);
動態(tài)
struct semaphore sem; /*信號量常作為一個大的結(jié)構(gòu)體中的一部分*/
sema_init(&sem, count);
init_MUTEX(&sem); /* 初始化互斥信號量 */
init_MUTEX_LOCKED(&sem); /*初始化后信號量就被鎖定 */
b.獲得信號量
void down(struct semaphore *sem);
減小信號量的值。函數(shù)返回后獲得信號量
int down_interruptible(struct semaphore *sem);;
在等待信號量時,可以被用戶空間程序通過信號中斷。
這是我們常用的版本。
如果操作被中斷,則返回非零值,調(diào)用者無法獲得信號量。
注意!調(diào)用時始終要檢查返回值。
int down_trylock(struct semaphore *sem);
不會休眠,如果不能獲得信號量,立即返回一個非零值
c.釋放信號量
viod up(struct semaphore *sem);
linux也提供了讀取者/寫入者信號量。
#include <linux/rwsem.h>
struct rw_semaphore;
(9)completion機制
在內(nèi)核編程中常見的一種模式是:
在當前線程之外初始化某個活動,然后等待該活動的結(jié)束。
這個活動可能是創(chuàng)建一個新的內(nèi)核線程或者新的用戶空間進程,
對一個已有進程的某個請求,或者某種類型的硬件活動等等。
可以利用信號量對兩個任務進行同步。例如:
在任務1中
struct semaphore sem;
init_MUTEX_LOCKED(&sem);
start_external_task(&sem);
down(&sem);
在任務2中
首先down(&sem),這樣任務1就會在sem上等待;
任務2完成后,調(diào)用up(&sem),任務1恢復執(zhí)行
用信號量來完成這一工作并不太有效,因此內(nèi)核提供了completion機制(出現(xiàn)在2.4.7)來完成這一工作。
a.創(chuàng)建completion
#include <linux/completion.h>
靜態(tài)
DECLARE_COMPLETION(my_completion);
動態(tài)
struct completion my_completion;
init_completion(&my_completion);
b.等待completion
void wait_for_completion(struct completion *c);
執(zhí)行一個非中斷的等待,用戶空間無法通過signal來中斷這個進程。
c.完成completion
void complete(struct completion *);
發(fā)信號喚醒任何等待任務
(2)如何禁止內(nèi)核搶占
由于內(nèi)核是搶占性的,內(nèi)核中的進程在任何時刻都可能停下來被更高優(yōu)先級的進程搶占。
有時候需要把這一特性關(guān)閉。
用戶驅(qū)動直接關(guān)閉內(nèi)核搶占的情況不多,但一些內(nèi)核機制的內(nèi)部需要這一功能。
比如單cpu系統(tǒng)上的spinlock鎖,
spin_lock的內(nèi)部并沒有忙等待,而只是關(guān)閉了內(nèi)核搶占。
a.搶占的相關(guān)函數(shù)
*preempt_disable()
增加搶占計數(shù)值,從而禁止內(nèi)核搶占。
如果搶占計數(shù)為0則內(nèi)核可以進行搶占,如果為1或更大的數(shù)值,則禁止搶占
*preempt_enable()
減少搶占計數(shù),并當該值降為0時檢查和執(zhí)行被掛起的需調(diào)度的任務
*preempt_enable_no_resched()
激活內(nèi)核搶占,但不檢查被掛起的需調(diào)度的任務
*preempt_count()
返回搶占計數(shù)
b.每個cpu上的數(shù)據(jù)訪問
int cpu;
/*禁止內(nèi)核搶占,并將cpu設(shè)置為當前處理器 */
cpu=get_cpu();
對每個處理器的數(shù)據(jù)進行操作
put_cpu();
另外,推薦嵌入式學習良心之作,韋東山老師嵌入式教程。