www.久久久久|狼友网站av天堂|精品国产无码a片|一级av色欲av|91在线播放视频|亚洲无码主播在线|国产精品草久在线|明星AV网站在线|污污内射久久一区|婷婷综合视频网站

當(dāng)前位置:首頁 > > 架構(gòu)師社區(qū)
[導(dǎo)讀]來自:程序員cxuan ? ?前言 談到并發(fā),我們不得不說AQS(AbstractQueuedSynchronizer),所謂的AQS即是抽象的隊(duì)列式的同步器,內(nèi)部定義了很多鎖相關(guān)的方法,我們熟知的ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore等都是基于AQS來實(shí)現(xiàn)的


我畫了35張圖就是為了讓你深入 AQS

來自:程序員cxuan


我畫了35張圖就是為了讓你深入 AQS
   前言

談到并發(fā),我們不得不說AQS(AbstractQueuedSynchronizer),所謂的AQS即是抽象的隊(duì)列式的同步器,內(nèi)部定義了很多鎖相關(guān)的方法,我們熟知的ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore等都是基于AQS來實(shí)現(xiàn)的。

我們先看下AQS相關(guān)的UML圖:

我畫了35張圖就是為了讓你深入 AQS

思維導(dǎo)圖(高清無損 AV 畫質(zhì)長圖.pdf 關(guān)注公眾號回復(fù) AQS 獲?。?/span>

我畫了35張圖就是為了讓你深入 AQS

我畫了35張圖就是為了讓你深入 AQS

1

 AQS實(shí)現(xiàn)原理

AQS中 維護(hù)了一個volatile int state(代表共享資源)和一個FIFO線程等待隊(duì)列(多線程爭用資源被阻塞時會進(jìn)入此隊(duì)列)。

這里volatile能夠保證多線程下的可見性,當(dāng)state=1則代表當(dāng)前對象鎖已經(jīng)被占有,其他線程來加鎖時則會失敗,加鎖失敗的線程會被放入一個FIFO的等待隊(duì)列中,比列會被UNSAFE.park()操作掛起,等待其他獲取鎖的線程釋放鎖才能夠被喚醒。

另外state的操作都是通過CAS來保證其并發(fā)修改的安全性。

具體原理我們可以用一張圖來簡單概括:

我畫了35張圖就是為了讓你深入 AQS         

AQS 中提供了很多關(guān)于鎖的實(shí)現(xiàn)方法,

  • getState():獲取鎖的標(biāo)志state值
  • setState():設(shè)置鎖的標(biāo)志state值
  • tryAcquire(int):獨(dú)占方式獲取鎖。嘗試獲取資源,成功則返回true,失敗則返回false。
  • tryRelease(int):獨(dú)占方式釋放鎖。嘗試釋放資源,成功則返回true,失敗則返回false。

這里還有一些方法并沒有列出來,接下來我們以ReentrantLock作為突破點(diǎn)通過源碼和畫圖的形式一步步了解AQS內(nèi)部實(shí)現(xiàn)原理。

我畫了35張圖就是為了讓你深入 AQS

2

 目錄結(jié)構(gòu)

文章準(zhǔn)備模擬多線程競爭鎖、釋放鎖的場景來進(jìn)行分析AQS源碼:

三個線程(線程一、線程二、線程三)同時來加鎖/釋放鎖

目錄如下:

  • 線程一加鎖成功時 AQS 內(nèi)部實(shí)現(xiàn)
  • 線程二/三加鎖失敗時 AQS 中等待隊(duì)列的數(shù)據(jù)模型
  • 線程一釋放鎖及線程二獲取鎖實(shí)現(xiàn)原理
  • 通過線程場景來講解公平鎖具體實(shí)現(xiàn)原理
  • 通過線程場景來講解Condition中a wait() signal() 實(shí)現(xiàn)原理

這里會通過畫圖來分析每個線程加鎖、釋放鎖后AQS內(nèi)部的數(shù)據(jù)結(jié)構(gòu)和實(shí)現(xiàn)原理

我畫了35張圖就是為了讓你深入 AQS

3

 場景分析

線程一加鎖成功

如果同時有三個線程并發(fā)搶占鎖,此時線程一搶占鎖成功,線程二線程三搶占鎖失敗,具體執(zhí)行流程如下:

我畫了35張圖就是為了讓你深入 AQS

此時AQS內(nèi)部數(shù)據(jù)為:

我畫了35張圖就是為了讓你深入 AQS

線程二、線程三加鎖失?。?/span>

我畫了35張圖就是為了讓你深入 AQS

有圖可以看出,等待隊(duì)列中的節(jié)點(diǎn)Node是一個雙向鏈表,這里SIGNALNodewaitStatus屬性,Node中還有一個nextWaiter屬性,這個并未在圖中畫出來,這個到后面Condition會具體講解的。

具體看下?lián)屨兼i代碼實(shí)現(xiàn):

java.util.concurrent.locks.ReentrantLock .NonfairSync:

static final class NonfairSync extends Sync {
    
    final void lock() {
        if (compareAndSetState(01))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

這里使用的ReentrantLock非公平鎖,線程進(jìn)來直接利用CAS嘗試搶占鎖,如果搶占成功state值回被改為1,且設(shè)置對象獨(dú)占鎖線程為當(dāng)前線程。如下所示:

protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}

線程二搶占鎖失敗

我們按照真實(shí)場景來分析,線程一搶占鎖成功后,state變?yōu)?,線程二通過CAS修改state變量必然會失敗。此時AQSFIFO(First In First Out 先進(jìn)先出)隊(duì)列中數(shù)據(jù)如圖所示:

我畫了35張圖就是為了讓你深入 AQS

我們將線程二執(zhí)行的邏輯一步步拆解來看:

java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire():

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

先看看tryAcquire()的具體實(shí)現(xiàn):java.util.concurrent.locks.ReentrantLock .nonfairTryAcquire():

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

nonfairTryAcquire()方法中首先會獲取state的值,如果不為0則說明當(dāng)前對象的鎖已經(jīng)被其他線程所占有,接著判斷占有鎖的線程是否為當(dāng)前線程,如果是則累加state值,這就是可重入鎖的具體實(shí)現(xiàn),累加state值,釋放鎖的時候也要依次遞減state值。

如果state為0,則執(zhí)行CAS操作,嘗試更新state值為1,如果更新成功則代表當(dāng)前線程加鎖成功。

線程二為例,因?yàn)?strong>線程一已經(jīng)將state修改為1,所以線程二通過CAS修改state的值不會成功。加鎖失敗。

線程二執(zhí)行tryAcquire()后會返回false,接著執(zhí)行addWaiter(Node.EXCLUSIVE)邏輯,將自己加入到一個FIFO等待隊(duì)列中,代碼實(shí)現(xiàn)如下:

java.util.concurrent.locks.AbstractQueuedSynchronizer.addWaiter():

private Node addWaiter(Node mode) {    
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

這段代碼首先會創(chuàng)建一個和當(dāng)前線程綁定的Node節(jié)點(diǎn),Node為雙向鏈表。此時等待對內(nèi)中的tail指針為空,直接調(diào)用enq(node)方法將當(dāng)前線程加入等待隊(duì)列尾部:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) {
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

第一遍循環(huán)時tail指針為空,進(jìn)入if邏輯,使用CAS操作設(shè)置head指針,將head指向一個新創(chuàng)建的Node節(jié)點(diǎn)。此時AQS中數(shù)據(jù):

我畫了35張圖就是為了讓你深入 AQS

執(zhí)行完成之后,head、tail、t都指向第一個Node元素。

接著執(zhí)行第二遍循環(huán),進(jìn)入else邏輯,此時已經(jīng)有了head節(jié)點(diǎn),這里要操作的就是將線程二對應(yīng)的Node節(jié)點(diǎn)掛到head節(jié)點(diǎn)后面。此時隊(duì)列中就有了兩個Node節(jié)點(diǎn):

我畫了35張圖就是為了讓你深入 AQS

addWaiter()方法執(zhí)行完后,會返回當(dāng)前線程創(chuàng)建的節(jié)點(diǎn)信息。繼續(xù)往后執(zhí)行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)邏輯,此時傳入的參數(shù)為線程二對應(yīng)的Node節(jié)點(diǎn)信息:

java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued():

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null// help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndChecknIterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

acquireQueued()這個方法會先判斷當(dāng)前傳入的Node對應(yīng)的前置節(jié)點(diǎn)是否為head,如果是則嘗試加鎖。加鎖成功過則將當(dāng)前節(jié)點(diǎn)設(shè)置為head節(jié)點(diǎn),然后空置之前的head節(jié)點(diǎn),方便后續(xù)被垃圾回收掉。

如果加鎖失敗或者Node的前置節(jié)點(diǎn)不是head節(jié)點(diǎn),就會通過shouldParkAfterFailedAcquire方法 將head節(jié)點(diǎn)的waitStatus變?yōu)榱?/span>SIGNAL=-1,最后執(zhí)行parkAndChecknIterrupt方法,調(diào)用LockSupport.park()掛起當(dāng)前線程。

此時AQS中的數(shù)據(jù)如下圖:

我畫了35張圖就是為了讓你深入 AQS

此時線程二就靜靜的待在AQS的等待隊(duì)列里面了,等著其他線程釋放鎖來喚醒它。

線程三搶占鎖失敗

看完了線程二搶占鎖失敗的分析,那么再來分析線程三搶占鎖失敗就很簡單了,先看看addWaiter(Node mode)方法:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

此時等待隊(duì)列的tail節(jié)點(diǎn)指向線程二,進(jìn)入if邏輯后,通過CAS指令將tail節(jié)點(diǎn)重新指向線程三。接著線程三調(diào)用enq()方法執(zhí)行入隊(duì)操作,和上面線程二執(zhí)行方式是一致的,入隊(duì)后會修改線程二對應(yīng)的Node中的waitStatus=SIGNAL。最后線程三也會被掛起。此時等待隊(duì)列的數(shù)據(jù)如圖:

我畫了35張圖就是為了讓你深入 AQS

線程一釋放鎖

現(xiàn)在來分析下釋放鎖的過程,首先是線程一釋放鎖,釋放鎖后會喚醒head節(jié)點(diǎn)的后置節(jié)點(diǎn),也就是我們現(xiàn)在的線程二,具體操作流程如下:

我畫了35張圖就是為了讓你深入 AQS

執(zhí)行完后等待隊(duì)列數(shù)據(jù)如下:

我畫了35張圖就是為了讓你深入 AQS

此時線程二已經(jīng)被喚醒,繼續(xù)嘗試獲取鎖,如果獲取鎖失敗,則會繼續(xù)被掛起。如果獲取鎖成功,則AQS中數(shù)據(jù)如圖:

我畫了35張圖就是為了讓你深入 AQS

接著還是一步步拆解來看,先看看線程一釋放鎖的代碼:

java.util.concurrent.locks.AbstractQueuedSynchronizer.release()

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

這里首先會執(zhí)行tryRelease()方法,這個方法具體實(shí)現(xiàn)在ReentrantLock中,如果tryRelease執(zhí)行成功,則繼續(xù)判斷head節(jié)點(diǎn)的waitStatus是否為0,前面我們已經(jīng)看到過,headwaitStatueSIGNAL(-1),這里就會執(zhí)行unparkSuccessor()方法來喚醒head的后置節(jié)點(diǎn),也就是我們上面圖中線程二對應(yīng)的Node節(jié)點(diǎn)。

此時看ReentrantLock.tryRelease()中的具體實(shí)現(xiàn):

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

執(zhí)行完ReentrantLock.tryRelease()后,state被設(shè)置成0,Lock對象的獨(dú)占鎖被設(shè)置為null。此時看下AQS中的數(shù)據(jù):

我畫了35張圖就是為了讓你深入 AQS

接著執(zhí)行java.util.concurrent.locks.AbstractQueuedSynchronizer.unparkSuccessor()方法,喚醒head的后置節(jié)點(diǎn):

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

這里主要是將head節(jié)點(diǎn)的waitStatus設(shè)置為0,然后解除head節(jié)點(diǎn)next的指向,使head節(jié)點(diǎn)空置,等待著被垃圾回收。

此時重新將head指針指向線程二對應(yīng)的Node節(jié)點(diǎn),且使用LockSupport.unpark方法來喚醒線程二

被喚醒的線程二會接著嘗試獲取鎖,用CAS指令修改state數(shù)據(jù)。執(zhí)行完成后可以查看AQS中數(shù)據(jù):

我畫了35張圖就是為了讓你深入 AQS

此時線程二被喚醒,線程二接著之前被park的地方繼續(xù)執(zhí)行,繼續(xù)執(zhí)行acquireQueued()方法。

線程二喚醒繼續(xù)加鎖

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null// help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

此時線程二被喚醒,繼續(xù)執(zhí)行for循環(huán),判斷線程二的前置節(jié)點(diǎn)是否為head,如果是則繼續(xù)使用tryAcquire()方法來嘗試獲取鎖,其實(shí)就是使用CAS操作來修改state值,如果修改成功則代表獲取鎖成功。接著將線程二設(shè)置為head節(jié)點(diǎn),然后空置之前的head節(jié)點(diǎn)數(shù)據(jù),被空置的節(jié)點(diǎn)數(shù)據(jù)等著被垃圾回收。

此時線程三獲取鎖成功,AQS中隊(duì)列數(shù)據(jù)如下:

我畫了35張圖就是為了讓你深入 AQS

等待隊(duì)列中的數(shù)據(jù)都等待著被垃圾回收。

線程二釋放鎖/線程三加鎖

當(dāng)線程二釋放鎖時,會喚醒被掛起的線程三,流程和上面大致相同,被喚醒的線程三會再次嘗試加鎖,具體代碼可以參考上面內(nèi)容。具體流程圖如下:

我畫了35張圖就是為了讓你深入 AQS

此時AQS中隊(duì)列數(shù)據(jù)如圖:

我畫了35張圖就是為了讓你深入 AQS

我畫了35張圖就是為了讓你深入 AQS

4

 公平鎖實(shí)現(xiàn)原理

上面所有的加鎖場景都是基于非公平鎖來實(shí)現(xiàn)的,非公平鎖ReentrantLock的默認(rèn)實(shí)現(xiàn),那我們接著來看一下公平鎖的實(shí)現(xiàn)原理,這里先用一張圖來解釋公平鎖非公平鎖的區(qū)別:

非公平鎖執(zhí)行流程:

我畫了35張圖就是為了讓你深入 AQS

這里我們還是用之前的線程模型來舉例子,當(dāng)線程二釋放鎖的時候,喚醒被掛起的線程三,線程三執(zhí)行tryAcquire()方法使用CAS操作來嘗試修改state值,如果此時又來了一個線程四也來執(zhí)行加鎖操作,同樣會執(zhí)行tryAcquire()方法。

這種情況就會出現(xiàn)競爭,線程四如果獲取鎖成功,線程三仍然需要待在等待隊(duì)列中被掛起。這就是所謂的非公平鎖,線程三辛辛苦苦排隊(duì)等到自己獲取鎖,卻眼巴巴的看到線程四插隊(duì)獲取到了鎖。

公平鎖執(zhí)行流程:

我畫了35張圖就是為了讓你深入 AQS

公平鎖在加鎖的時候,會先判斷AQS等待隊(duì)列中是存在節(jié)點(diǎn),如果存在節(jié)點(diǎn)則會直接入隊(duì)等待,具體代碼如下.

公平鎖在獲取鎖是也是首先會執(zhí)行acquire()方法,只不過公平鎖單獨(dú)實(shí)現(xiàn)了tryAcquire()方法:

#java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire():

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

這里會執(zhí)行ReentrantLock中公平鎖的tryAcquire()方法

#java.util.concurrent.locks.ReentrantLock.FairSync.tryAcquire():

static final class FairSync extends Sync {
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

這里會先判斷state值,如果不為0且獲取鎖的線程不是當(dāng)前線程,直接返回false代表獲取鎖失敗,被加入等待隊(duì)列。如果是當(dāng)前線程則可重入獲取鎖。

如果state=0則代表此時沒有線程持有鎖,執(zhí)行hasQueuedPredecessors()判斷AQS等待隊(duì)列中是否有元素存在,如果存在其他等待線程,那么自己也會加入到等待隊(duì)列尾部,做到真正的先來后到,有序加鎖。具體代碼如下:

#java.util.concurrent.locks.AbstractQueuedSynchronizer.hasQueuedPredecessors():

public final boolean hasQueuedPredecessors() {
    Node t = tail;
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

這段代碼很有意思,返回false代表隊(duì)列中沒有節(jié)點(diǎn)或者僅有一個節(jié)點(diǎn)是當(dāng)前線程創(chuàng)建的節(jié)點(diǎn)。返回true則代表隊(duì)列中存在等待節(jié)點(diǎn),當(dāng)前線程需要入隊(duì)等待。

我畫了35張圖就是為了讓你深入 AQS

先判斷head是否等于tail,如果隊(duì)列中只有一個Node節(jié)點(diǎn),那么head會等于tail,接著判斷head的后置節(jié)點(diǎn),這里肯定會是null,如果此Node節(jié)點(diǎn)對應(yīng)的線程和當(dāng)前的線程是同一個線程,那么則會返回false,代表沒有等待節(jié)點(diǎn)或者等待節(jié)點(diǎn)就是當(dāng)前線程創(chuàng)建的Node節(jié)點(diǎn)。此時當(dāng)前線程會嘗試獲取鎖。

如果headtail不相等,說明隊(duì)列中有等待線程創(chuàng)建的節(jié)點(diǎn),此時直接返回true,如果只有一個節(jié)點(diǎn),而此節(jié)點(diǎn)的線程和當(dāng)前線程不一致,也會返回true

非公平鎖公平鎖的區(qū)別:非公平鎖性能高于公平鎖性能。非公平鎖可以減少CPU喚醒線程的開銷,整體的吞吐效率會高點(diǎn),CPU也不必取喚醒所有線程,會減少喚起線程的數(shù)量

非公平鎖性能雖然優(yōu)于公平鎖,但是會存在導(dǎo)致線程饑餓的情況。在最壞的情況下,可能存在某個線程一直獲取不到鎖。不過相比性能而言,饑餓問題可以暫時忽略,這可能就是ReentrantLock默認(rèn)創(chuàng)建非公平鎖的原因之一了。

我畫了35張圖就是為了讓你深入 AQS

5

 Condition實(shí)現(xiàn)原理


Condition 簡介

上面已經(jīng)介紹了AQS所提供的核心功能,當(dāng)然它還有很多其他的特性,這里我們來繼續(xù)說下Condition這個組件。

Condition是在java 1.5中才出現(xiàn)的,它用來替代傳統(tǒng)的Objectwait()notify()實(shí)現(xiàn)線程間的協(xié)作,相比使用Objectwait()、notify(),使用Condition中的await()、signal()這種方式實(shí)現(xiàn)線程間協(xié)作更加安全和高效。因此通常來說比較推薦使用Condition

其中AbstractQueueSynchronizer中實(shí)現(xiàn)了Condition中的方法,主要對外提供awaite(Object.wait())signal(Object.notify())調(diào)用。

Condition Demo示例

使用示例代碼:

/**
 * ReentrantLock 實(shí)現(xiàn)源碼學(xué)習(xí)
 * @author 一枝花算不算浪漫
 * @date 2020/4/28 7:20
 */

public class ReentrantLockDemo {
    static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {
        Condition condition = lock.newCondition();

        new Thread(() -> {
            lock.lock();
            try {
                System.out.println("線程一加鎖成功");
                System.out.println("線程一執(zhí)行await被掛起");
                condition.await();
                System.out.println("線程一被喚醒成功");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
                System.out.println("線程一釋放鎖成功");
            }
        }).start();

        new Thread(() -> {
            lock.lock();
            try {
                System.out.println("線程二加鎖成功");
                condition.signal();
                System.out.println("線程二喚醒線程一");
            } finally {
                lock.unlock();
                System.out.println("線程二釋放鎖成功");
            }
        }).start();
    }
}

執(zhí)行結(jié)果如下圖:

我畫了35張圖就是為了讓你深入 AQS

這里線程一先獲取鎖,然后使用await()方法掛起當(dāng)前線程并釋放鎖,線程二獲取鎖后使用signal喚醒線程一。

Condition實(shí)現(xiàn)原理圖解

我們還是用上面的demo作為實(shí)例,執(zhí)行的流程如下:

我畫了35張圖就是為了讓你深入 AQS

線程一執(zhí)行await()方法:

先看下具體的代碼實(shí)現(xiàn),#java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject.await()

 public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null// clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

await()方法中首先調(diào)用addConditionWaiter()將當(dāng)前線程加入到Condition隊(duì)列中。

執(zhí)行完后我們可以看下Condition隊(duì)列中的數(shù)據(jù):

我畫了35張圖就是為了讓你深入 AQS

具體實(shí)現(xiàn)代碼為:

private Node addConditionWaiter() {
    Node t = lastWaiter;
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

這里會用當(dāng)前線程創(chuàng)建一個Node節(jié)點(diǎn),waitStatusCONDITION。接著會釋放該節(jié)點(diǎn)的鎖,調(diào)用之前解析過的release()方法,釋放鎖后此時會喚醒被掛起的線程二,線程二會繼續(xù)嘗試獲取鎖。

接著調(diào)用isOnSyncQueue()方法判斷當(dāng)前節(jié)點(diǎn)是否為Condition隊(duì)列中的頭部節(jié)點(diǎn),如果是則調(diào)用LockSupport.park(this)掛起Condition中當(dāng)前線程。此時線程一被掛起,線程二獲取鎖成功。

具體流程如下圖:

我畫了35張圖就是為了讓你深入 AQS

線程二執(zhí)行signal()方法:

首先我們考慮下線程二已經(jīng)獲取到鎖,此時AQS等待隊(duì)列中已經(jīng)沒有了數(shù)據(jù)。

接著就來看看線程二喚醒線程一的具體執(zhí)行流程:

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

先判斷當(dāng)前線程是否為獲取鎖的線程,如果不是則直接拋出異常。接著調(diào)用doSignal()方法來喚醒線程。

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

/**
 * Inserts node into queue, initializing if necessary. See picture above.
 * @param node the node to insert
 * @return node's predecessor
 */

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

這里先從transferForSignal()方法來看,通過上面的分析我們知道Condition隊(duì)列中只有線程一創(chuàng)建的一個Node節(jié)點(diǎn),且waitStatueCONDITION,先通過CAS修改當(dāng)前節(jié)點(diǎn)waitStatus為0,然后執(zhí)行enq()方法將當(dāng)前線程加入到等待隊(duì)列中,并返回當(dāng)前線程的前置節(jié)點(diǎn)。

加入等待隊(duì)列的代碼在上面也已經(jīng)分析過,此時等待隊(duì)列中數(shù)據(jù)如下圖:

我畫了35張圖就是為了讓你深入 AQS

接著開始通過CAS修改當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn)waitStatusSIGNAL,并且喚醒當(dāng)前線程。此時AQS中等待隊(duì)列數(shù)據(jù)為:

我畫了35張圖就是為了讓你深入 AQS

線程一被喚醒后,繼續(xù)執(zhí)行await()方法中的 while 循環(huán)。

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null// clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

因?yàn)榇藭r線程一的waitStatus已經(jīng)被修改為0,所以執(zhí)行isOnSyncQueue()方法會返回false。跳出while循環(huán)。

接著執(zhí)行acquireQueued()方法,這里之前也有講過,嘗試重新獲取鎖,如果獲取鎖失敗繼續(xù)會被掛起。直到另外線程釋放鎖才被喚醒。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null// help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

此時線程一的流程都已經(jīng)分析完了,等線程二釋放鎖后,線程一會繼續(xù)重試獲取鎖,流程到此終結(jié)。

Condition總結(jié)

我們總結(jié)下 Condition 和 wait/notify 的比較:

  • Condition 可以精準(zhǔn)的對多個不同條件進(jìn)行控制,wait/notify 只能和 synchronized 關(guān)鍵字一起使用,并且只能喚醒一個或者全部的等待隊(duì)列;

  • Condition 需要使用 Lock 進(jìn)行控制,使用的時候要注意 lock() 后及時的 unlock(),Condition 有類似于 await 的機(jī)制,因此不會產(chǎn)生加鎖方式而產(chǎn)生的死鎖出現(xiàn),同時底層實(shí)現(xiàn)的是 park/unpark 的機(jī)制,因此也不會產(chǎn)生先喚醒再掛起的死鎖,一句話就是不會產(chǎn)生死鎖,但是 wait/notify 會產(chǎn)生先喚醒再掛起的死鎖。

我畫了35張圖就是為了讓你深入 AQS

6

 總結(jié)


這里用了一步一圖的方式結(jié)合三個線程依次加鎖/釋放鎖來展示了ReentrantLock的實(shí)現(xiàn)方式和實(shí)現(xiàn)原理,而ReentrantLock底層就是基于AQS實(shí)現(xiàn)的,所以我們也對AQS有了深刻的理解。

另外還介紹了公平鎖非公平鎖的實(shí)現(xiàn)原理,Condition的實(shí)現(xiàn)原理,基本上都是使用源碼+繪圖的講解方式,盡量讓大家更容易去理解。

end


特別推薦一個分享架構(gòu)+算法的優(yōu)質(zhì)內(nèi)容,還沒關(guān)注的小伙伴,可以長按關(guān)注一下:

我畫了35張圖就是為了讓你深入 AQS

長按訂閱更多精彩▼

我畫了35張圖就是為了讓你深入 AQS

如有收獲,點(diǎn)個在看,誠摯感謝

免責(zé)聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺僅提供信息存儲服務(wù)。文章僅代表作者個人觀點(diǎn),不代表本平臺立場,如有問題,請聯(lián)系我們,謝謝!

本站聲明: 本文章由作者或相關(guān)機(jī)構(gòu)授權(quán)發(fā)布,目的在于傳遞更多信息,并不代表本站贊同其觀點(diǎn),本站亦不保證或承諾內(nèi)容真實(shí)性等。需要轉(zhuǎn)載請聯(lián)系該專欄作者,如若文章內(nèi)容侵犯您的權(quán)益,請及時聯(lián)系本站刪除。
換一批
延伸閱讀

LED驅(qū)動電源的輸入包括高壓工頻交流(即市電)、低壓直流、高壓直流、低壓高頻交流(如電子變壓器的輸出)等。

關(guān)鍵字: 驅(qū)動電源

在工業(yè)自動化蓬勃發(fā)展的當(dāng)下,工業(yè)電機(jī)作為核心動力設(shè)備,其驅(qū)動電源的性能直接關(guān)系到整個系統(tǒng)的穩(wěn)定性和可靠性。其中,反電動勢抑制與過流保護(hù)是驅(qū)動電源設(shè)計(jì)中至關(guān)重要的兩個環(huán)節(jié),集成化方案的設(shè)計(jì)成為提升電機(jī)驅(qū)動性能的關(guān)鍵。

關(guān)鍵字: 工業(yè)電機(jī) 驅(qū)動電源

LED 驅(qū)動電源作為 LED 照明系統(tǒng)的 “心臟”,其穩(wěn)定性直接決定了整個照明設(shè)備的使用壽命。然而,在實(shí)際應(yīng)用中,LED 驅(qū)動電源易損壞的問題卻十分常見,不僅增加了維護(hù)成本,還影響了用戶體驗(yàn)。要解決這一問題,需從設(shè)計(jì)、生...

關(guān)鍵字: 驅(qū)動電源 照明系統(tǒng) 散熱

根據(jù)LED驅(qū)動電源的公式,電感內(nèi)電流波動大小和電感值成反比,輸出紋波和輸出電容值成反比。所以加大電感值和輸出電容值可以減小紋波。

關(guān)鍵字: LED 設(shè)計(jì) 驅(qū)動電源

電動汽車(EV)作為新能源汽車的重要代表,正逐漸成為全球汽車產(chǎn)業(yè)的重要發(fā)展方向。電動汽車的核心技術(shù)之一是電機(jī)驅(qū)動控制系統(tǒng),而絕緣柵雙極型晶體管(IGBT)作為電機(jī)驅(qū)動系統(tǒng)中的關(guān)鍵元件,其性能直接影響到電動汽車的動力性能和...

關(guān)鍵字: 電動汽車 新能源 驅(qū)動電源

在現(xiàn)代城市建設(shè)中,街道及停車場照明作為基礎(chǔ)設(shè)施的重要組成部分,其質(zhì)量和效率直接關(guān)系到城市的公共安全、居民生活質(zhì)量和能源利用效率。隨著科技的進(jìn)步,高亮度白光發(fā)光二極管(LED)因其獨(dú)特的優(yōu)勢逐漸取代傳統(tǒng)光源,成為大功率區(qū)域...

關(guān)鍵字: 發(fā)光二極管 驅(qū)動電源 LED

LED通用照明設(shè)計(jì)工程師會遇到許多挑戰(zhàn),如功率密度、功率因數(shù)校正(PFC)、空間受限和可靠性等。

關(guān)鍵字: LED 驅(qū)動電源 功率因數(shù)校正

在LED照明技術(shù)日益普及的今天,LED驅(qū)動電源的電磁干擾(EMI)問題成為了一個不可忽視的挑戰(zhàn)。電磁干擾不僅會影響LED燈具的正常工作,還可能對周圍電子設(shè)備造成不利影響,甚至引發(fā)系統(tǒng)故障。因此,采取有效的硬件措施來解決L...

關(guān)鍵字: LED照明技術(shù) 電磁干擾 驅(qū)動電源

開關(guān)電源具有效率高的特性,而且開關(guān)電源的變壓器體積比串聯(lián)穩(wěn)壓型電源的要小得多,電源電路比較整潔,整機(jī)重量也有所下降,所以,現(xiàn)在的LED驅(qū)動電源

關(guān)鍵字: LED 驅(qū)動電源 開關(guān)電源

LED驅(qū)動電源是把電源供應(yīng)轉(zhuǎn)換為特定的電壓電流以驅(qū)動LED發(fā)光的電壓轉(zhuǎn)換器,通常情況下:LED驅(qū)動電源的輸入包括高壓工頻交流(即市電)、低壓直流、高壓直流、低壓高頻交流(如電子變壓器的輸出)等。

關(guān)鍵字: LED 隧道燈 驅(qū)動電源
關(guān)閉