一文詳解,死鎖與解決方案(附源碼)
時(shí)間:2021-10-29 16:38:12
手機(jī)看文章
掃描二維碼
隨時(shí)隨地手機(jī)看文章
[導(dǎo)讀]死鎖的現(xiàn)象想象一個(gè)場(chǎng)景,賬戶(hù)A給賬戶(hù)B轉(zhuǎn)賬,同時(shí)賬戶(hù)B也給賬戶(hù)A轉(zhuǎn)賬,兩個(gè)賬戶(hù)都需要鎖住余額,所以通常會(huì)申請(qǐng)兩把鎖,轉(zhuǎn)賬時(shí),先鎖住自己的賬戶(hù),并獲取對(duì)方的鎖,保證同一時(shí)刻只能有一個(gè)線程去執(zhí)行轉(zhuǎn)賬。這時(shí)可能就會(huì)出現(xiàn),對(duì)方給我轉(zhuǎn)賬,同時(shí)我也給對(duì)方轉(zhuǎn)賬,那么雙方都持有自己的鎖,且嘗試去...
死鎖的現(xiàn)象
想象一個(gè)場(chǎng)景,賬戶(hù)A給賬戶(hù)B轉(zhuǎn)賬,同時(shí)賬戶(hù)B也給賬戶(hù)A轉(zhuǎn)賬,兩個(gè)賬戶(hù)都需要鎖住余額,所以通常會(huì)申請(qǐng)兩把鎖,轉(zhuǎn)賬時(shí),先鎖住自己的賬戶(hù),并獲取對(duì)方的鎖,保證同一時(shí)刻只能有一個(gè)線程去執(zhí)行轉(zhuǎn)賬。
這時(shí)可能就會(huì)出現(xiàn),對(duì)方給我轉(zhuǎn)賬,同時(shí)我也給對(duì)方轉(zhuǎn)賬,那么雙方都持有自己的鎖,且嘗試去獲取對(duì)方的鎖,這就造成可能一直申請(qǐng)不到對(duì)方的鎖,循環(huán)等待,就會(huì)發(fā)生“死鎖”。
一旦發(fā)生死鎖,線程一直占用著資源無(wú)法釋放,又無(wú)法完成轉(zhuǎn)賬,就會(huì)造成系統(tǒng)假死。
什么是死鎖?
“死鎖”就是兩個(gè)或兩個(gè)以上的線程在執(zhí)行過(guò)程中,互相持有對(duì)方所需要的資源,導(dǎo)致這些線程處于等待狀態(tài),無(wú)法繼續(xù)執(zhí)行。若無(wú)外力作用,它們都將無(wú)法繼續(xù)執(zhí)行下去,就進(jìn)入了“永久”阻塞的狀態(tài)。圖1 死鎖的現(xiàn)象
如圖所示,線程1獲取了資源1,同時(shí)去請(qǐng)求獲取資源2,但是線程2已經(jīng)占有資源2了,所以線程1只能等待。同樣的,線程2占有了資源2,要請(qǐng)求獲取資源1,但資源1已經(jīng)被線程1占有了,只能等待。于是線程1和線程2都在等待持有對(duì)方的持有的資源,就會(huì)無(wú)限等待下去,這就是死鎖現(xiàn)象。
模擬發(fā)生死鎖的場(chǎng)景
下面寫(xiě)一段代碼,模擬兩個(gè)線程各自持有了鎖,然后請(qǐng)求獲取對(duì)方持有的鎖,發(fā)生死鎖的現(xiàn)象。
public class DeadLock {
public static String obj1 = "obj1";
public static String obj2 = "obj2";
public static void main(String[] args) {
Thread a = new Thread(new Lock1());
Thread b = new Thread(new Lock2());
a.start();
b.start();
}
static class Lock1 implements Runnable {
@Override
public void run() {
try {
System.out.println("Lock1 running");
synchronized (DeadLock.obj1) {
System.out.println("Lock1 lock obj1");
Thread.sleep(5000);
synchronized (DeadLock.obj2) {
System.out.println("Lock1 lock obj2");
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
static class Lock2 implements Runnable {
@Override
public void run() {
try {
System.out.println("Lock2 running");
synchronized (DeadLock.obj2) {
System.out.println("Lock2 lock obj2");
Thread.sleep(5000);
synchronized (DeadLock.obj1) {
System.out.println("Lock2 lock obj1");
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
程序啟動(dòng)后,從控制臺(tái)輸出,就能看出兩個(gè)線程都沒(méi)有結(jié)束,而是被卡住了。圖2?死鎖demo輸出我們用jvisualVM看下線程的堆棧信息:
圖3?jvisualVM堆棧信息
我們用jvisualVM查看線程的堆棧信息,發(fā)現(xiàn)已經(jīng)檢測(cè)到了死鎖的存在,而且定位到了具體的代碼行。
死鎖產(chǎn)生的原因
死鎖的發(fā)生也必須具備一定的條件,必須具備以下四個(gè)條件:
- 互斥,共享資源 X 和 Y 只能被一個(gè)線程占用;
- 占有且等待,線程01 已經(jīng)取得共享資源 X,在等待共享資源 Y 的時(shí)候,不釋放共享資源 X;
- 不可搶占,其他線程不能強(qiáng)行搶占線程01 占有的資源;
- 循環(huán)等待,線程01 等待線程02 占有的資源,線程02 等待線程01 占有的資源,就是循環(huán)等待。
如何避免死鎖?
死鎖一旦發(fā)生,并沒(méi)有什么好的方法解決,通常我們只能避免死鎖的發(fā)生。
怎么避免呢?那就要看針對(duì)死鎖發(fā)生的原因去解決。
- 首先,“互斥”是沒(méi)有辦法避免的,你想從賬戶(hù)A轉(zhuǎn)賬到賬戶(hù)B,就必須加鎖,就沒(méi)法避免互斥的存在。
- 對(duì)于“占用且等待”這個(gè)條件,我們可以一次性申請(qǐng)所有的資源,這樣就不存在等待了。
- 對(duì)于“不可搶占”這個(gè)條件,占用部分資源的線程進(jìn)一步申請(qǐng)其他資源時(shí),如果申請(qǐng)不到,可以在一定時(shí)間后,主動(dòng)釋放它占有的資源,這樣就解決了不可搶占這個(gè)條件。
- 對(duì)于“循環(huán)等待”,我們可以靠按“次序”申請(qǐng)資源來(lái)預(yù)防。所謂按序申請(qǐng),就是給資源設(shè)定順序,申請(qǐng)的時(shí)候可以先申請(qǐng)序號(hào)小的資源,再申請(qǐng)序號(hào)大的,這樣資源線性化后,自然就不存在循環(huán)等待了。
所以,總結(jié)來(lái)看,避免死鎖的發(fā)生有三種方法:破壞占用且等待的條件、破壞不可搶占條件、破壞循環(huán)等待條件。
1、破壞占用且等待條件
我們要破壞占用且等待,就是一次性申請(qǐng)占有所有的資源。賬戶(hù)A給賬戶(hù)B轉(zhuǎn)賬,就可以一次性申請(qǐng)賬戶(hù)A和賬戶(hù)B的鎖,同時(shí)拿到兩個(gè)鎖之后,在執(zhí)行轉(zhuǎn)賬操作。
public?class?DeadLock2?{
public static void main(String[] args) {
Account a = new Account();
Account b = new Account();
a.transfer(b, 100);
b.transfer(a, 200);
}
static class Allocator {
private List als = new ArrayList<>();
private void Allocator() {
}
synchronized boolean apply(Account from, Account to) {
if (als.contains(from) || als.contains(to)) {
return false;
} else {
als.add(from);
als.add(to);
}
return true;
}
synchronized void clean(Account from, Account to) {
als.remove(from);
als.remove(to);
}
}
static class Account {
private Allocator actr = DeadLock2.getInstance();
private int balance;
void transfer(Account target, int amt) {
while (!actr.apply(this, target)){
}
try {
synchronized (this) {
System.out.println(this.toString() " lock lock1");
synchronized (target) {
System.out.println(this.toString() " lock lock2");
if (this.balance > amt) {
this.balance -= amt;
target.balance = amt;
}
}
}
} finally {
actr.clean(this, target);
}
}
}
private static class SingleTonHoler {
private static Allocator INSTANCE = new Allocator();
}
public static Allocator getInstance() {
return SingleTonHoler.INSTANCE;
}
}
輸出結(jié)果如下:圖4?破壞占用且等待條件輸出
從輸出結(jié)果看出,并沒(méi)有發(fā)生死鎖,一個(gè)賬戶(hù)先獲取了兩把鎖,完成轉(zhuǎn)賬后,另一個(gè)賬號(hào)再獲取到兩把鎖,完成轉(zhuǎn)賬。
上面的demo比較見(jiàn)到,如果賬號(hào)沒(méi)獲取到鎖,會(huì)一直while循環(huán)等待,可以?xún)?yōu)化為notify/wait的方式。
2、?破壞不可搶占條件
破壞不搶占條件,需要發(fā)生死鎖的線程能夠主動(dòng)釋放它占有的資源,但使用synchronized是做不到的。原因?yàn)閟ynchronized申請(qǐng)不到資源時(shí),線程直接進(jìn)入了阻塞狀態(tài),而線程進(jìn)入了阻塞狀態(tài)也就沒(méi)有辦法釋放它占有的資源了。
不過(guò)JDK中的Lock解決這個(gè)問(wèn)題。
使用Lock類(lèi)中的定時(shí)tryLock獲取鎖,可以指定一個(gè)超時(shí)時(shí)限(Timeout),在等待超過(guò)該時(shí)間后tryLock就會(huì)返回一個(gè)失敗信息,也會(huì)釋放其擁有的資源。
public class DeadLock3 {
public static ReentrantLock lock1 = new ReentrantLock();
public static ReentrantLock lock2 = new ReentrantLock();
public static void main(String[] args) {
Thread a = new Thread(new Lock1());
Thread b = new Thread(new Lock2());
a.start();
b.start();
}
static class Lock1 implements Runnable {
@Override
public void run() {
try {
System.out.println("Lock1 running");
while (true) {
if (lock1.tryLock(1, TimeUnit.MILLISECONDS)) {
System.out.println("Lock1 get lock1");
if (lock2.tryLock(1, TimeUnit.MILLISECONDS)) {
System.out.println("Lock·get lock2");
return;
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock1.unlock();
lock2.unlock();
}
}
}
static class Lock2 implements Runnable {
@Override
public void run() {
try {
System.out.println("Lock2 running");
while (true) {
if (lock1.tryLock(1, TimeUnit.MILLISECONDS)) {
System.out.println("Lock2 get lock1");
if (lock2.tryLock(1, TimeUnit.MILLISECONDS)) {
System.out.println("Lock2 get lock2");
return;
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock1.unlock();
lock2.unlock();
}
}
}
}
輸出結(jié)果如下:圖5?破壞不可搶占條件輸出
從輸出結(jié)果看出,并沒(méi)有發(fā)生死鎖,一個(gè)賬戶(hù)先嘗試獲取兩把鎖,如果超時(shí)沒(méi)有獲取到,就會(huì)下次重試再去獲取,直到獲取成功。
3、破壞循環(huán)等待條件
破壞循環(huán)等待,就是要對(duì)系統(tǒng)中的資源進(jìn)行統(tǒng)一編號(hào),進(jìn)程必須按照資源的編號(hào)順序提出。這樣做就能保證系統(tǒng)不出現(xiàn)死鎖。這就是“資源有序分配法”。代碼如下:
class Account {
private int id;
private int balance;
void transfer(Account target, int amt){
Account left = this;
Account right = target;
if (this.id > target.id) {
left = target;
right = this;
}
synchronized(left){
synchronized(right){
if (this.balance > amt){
this.balance -= amt;
target.balance = amt;
}
}
}
}
}
總結(jié):文章主要講了死鎖發(fā)生的原因以及解決方法,但我們平時(shí)寫(xiě)的代碼,可能邏輯比這里的例子要復(fù)雜很多,如果產(chǎn)生了死鎖,可能會(huì)比較難以定位到,所以我們平時(shí)寫(xiě)代碼時(shí),盡量不要把多個(gè)鎖交織在一起。