设为首页收藏本站

IT技术擎 - 最棒的IT web技术交流社区

 找回密码
 注册为IT技术擎人

QQ登录

只需一步,快速开始

搜索
热搜: php h5 jquery
查看: 31|回复: 0

[其他] Java并发包基石-AQS详解

[复制链接]

1万

主题

1万

帖子

4万

积分

版主

Rank: 7Rank: 7Rank: 7

积分
40184
发表于 2018-5-11 14:55:37 | 显示全部楼层 |阅读模式

目录
    
1 基本实现原理

      
1.1 如何使用
       1.2 设计思想
    2 自定义同步器
      
2.1 同步器代码实现
       2.2 同步器代码测试
    3 源码分析
      
3.1 Node结点
       3.2 独占式
       3.3 共享式
    4 总结 
 Java并发包(JUC)中提供了很多并发工具,这其中,很多我们耳熟能详的并发工具,譬如ReentrangLock、Semaphore,它们的实现都用到了一个共同的基类--AbstractQueuedSynchronizer,简称AQS。AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的ReentrantLock,Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。当然,我们自己也能利用AQS非常轻松容易地构造出符合我们自己需求的同步器。
  本章我们就一起探究下这个神奇的东东,并对其实现原理进行剖析理解
基本实现原理
  AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。
  1. privatevolatileintstate;//共享变量,使用volatile修饰保证线程可见性
复制代码
状态信息通过procted类型的getStatesetStatecompareAndSetState进行操作
AQS支持两种同步方式:
  1.独占式
  2.共享式
  这样方便使用者实现不同类型的同步组件,独占式如ReentrantLock,共享式如Semaphore,CountDownLatch,组合式的如ReentrantReadWriteLock。总之,AQS为使用提供了底层支撑,如何组装实现,使用者可以自由发挥。
同步器的设计是基于模板方法模式的,一般的使用方式是这样:
  1.使用者继承AbstractQueuedSynchronizer并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state的获取和释放)  2.将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。
这其实是模板方法模式的一个很经典的应用。
我们来看看AQS定义的这些可重写的方法:
    protected boolean tryAcquire(int arg) : 独占式获取同步状态,试着获取,成功返回true,反之为false    protected boolean tryRelease(int arg) :独占式释放同步状态,等待中的其他线程此时将有机会获取到同步状态;
    protected int tryAcquireShared(int arg) :共享式获取同步状态,返回值大于等于0,代表获取成功;反之获取失败;
    protected boolean tryReleaseShared(int arg) :共享式释放同步状态,成功为true,失败为false
    protected boolean isHeldExclusively() : 是否在独占模式下被线程占用。
关于AQS的使用,我们来简单总结一下:
  
如何使用
  首先,我们需要去继承AbstractQueuedSynchronizer这个类,然后我们根据我们的需求去重写相应的方法,比如要实现一个独占锁,那就去重写tryAcquire,tryRelease方法,要实现共享锁,就去重写tryAcquireShared,tryReleaseShared;最后,在我们的组件中调用AQS中的模板方法就可以了,而这些模板方法是会调用到我们之前重写的那些方法的。也就是说,我们只需要很小的工作量就可以实现自己的同步组件,重写的那些方法,仅仅是一些简单的对于共享资源state的获取和释放操作,至于像是获取资源失败,线程需要阻塞之类的操作,自然是AQS帮我们完成了。
  
设计思想
  对于使用者来讲,我们无需关心获取资源失败,线程排队,线程阻塞/唤醒等一系列复杂的实现,这些都在AQS中为我们处理好了。我们只需要负责好自己的那个环节就好,也就是获取/释放共享资源state的姿势T_T。很经典的模板方法设计模式的应用,AQS为我们定义好顶级逻辑的骨架,并提取出公用的线程入队列/出队列,阻塞/唤醒等一系列复杂逻辑的实现,将部分简单的可由使用者决定的操作逻辑延迟到子类中去实现即可。

自定义同步器
  
同步器代码实现

上面大概讲了一些关于AQS如何使用的理论性的东西,接下来,我们就来看下实际如何使用,直接采用JDK官方文档中的小例子来说明问题
  1. 1
  2. packagejuc;
  3. 23importjava.util.concurrent.locks.AbstractQueuedSynchronizer;
  4. 45/**6* Created by chengxiao on 2017/3/28.
  5. 7*/
  6. 8publicclassMutex implementsjava.io.Serializable {
  7. 9 //静态内部类,继承AQS10 privatestaticclassSync extendsAbstractQueuedSynchronizer {
  8. 11 //是否处于占用状态12 protectedbooleanisHeldExclusively() {
  9. 13 returngetState() == 1;
  10. 14 }
  11. 15 //当状态为0的时候获取锁,CAS操作成功,则state状态为1,16 publicbooleantryAcquire(intacquires) {
  12. 17 if(compareAndSetState(0, 1)) {
  13. 18 setExclusiveOwnerThread(Thread.currentThread());
  14. 19 returntrue;
  15. 20 }
  16. 21 returnfalse;
  17. 22 }
  18. 23 //释放锁,将同步状态置为024 protectedbooleantryRelease(intreleases) {
  19. 25 if(getState() == 0) thrownewIllegalMonitorStateException();
  20. 26 setExclusiveOwnerThread(null);
  21. 27 setState(0);
  22. 28 returntrue;
  23. 29 }
  24. 30 }
  25. 31 //同步对象完成一系列复杂的操作,我们仅需指向它即可32 privatefinalSync sync = newSync();
  26. 33 //加锁操作,代理到acquire(模板方法)上就行,acquire会调用我们重写的tryAcquire方法34 publicvoidlock() {
  27. 35 sync.acquire(1);
  28. 36 }
  29. 37 publicbooleantryLock() {
  30. 38 returnsync.tryAcquire(1);
  31. 39 }
  32. 40 //释放锁,代理到release(模板方法)上就行,release会调用我们重写的tryRelease方法。41 publicvoidunlock() {
  33. 42 sync.release(1);
  34. 43 }
  35. 44 publicbooleanisLocked() {
  36. 45 returnsync.isHeldExclusively();
  37. 46 }
  38. 47}
复制代码
  
同步器代码测试
测试下这个自定义的同步器,我们使用之前文章中做过的并发环境下a++的例子来说明问题(a++的原子性其实最好使用原子类AtomicInteger来解决,此处用Mutex有点大炮打蚊子的意味,好在能说明问题就好)
  1. 1packagejuc;
  2. 23importjava.util.concurrent.CyclicBarrier;
  3. 45/**6* Created by chengxiao on 2017/7/16.
  4. 7*/
  5. 8publicclassTestMutex {
  6. 9 privatestaticCyclicBarrier barrier = newCyclicBarrier(31);
  7. 10 privatestaticinta = 0;
  8. 11 privatestaticMutex mutex = newMutex();
  9. 1213 publicstaticvoidmain(String []args) throwsException {
  10. 14 //说明:我们启用30个线程,每个线程对i自加10000次,同步正常的话,最终结果应为300000;
  11. 15 //未加锁前16 for(inti=0;i <30;i++){
  12. 17 Thread t = newThread(newRunnable() {
  13. 18 @Override
  14. 19 publicvoidrun() {
  15. 20 for(inti=0;i <10000;i++){
  16. 21 increment1();//没有同步措施的a++;22 }
  17. 23 try{
  18. 24 barrier.await();//等30个线程累加完毕25 } catch(Exception e) {
  19. 26 e.printStackTrace();
  20. 27 }
  21. 28 }
  22. 29 });
  23. 30 t.start();
  24. 31 }
  25. 32 barrier.await();
  26. 33 System.out.println("加锁前,a="+a);
  27. 34 //加锁后35 barrier.reset();//重置CyclicBarrier36 a=0;
  28. 37 for(inti=0;i <30;i++){
  29. 38 newThread(newRunnable() {
  30. 39 @Override
  31. 40 publicvoidrun() {
  32. 41 for(inti=0;i <10000;i++){
  33. 42 increment2();//a++采用Mutex进行同步处理43 }
  34. 44 try{
  35. 45 barrier.await();//等30个线程累加完毕46 } catch(Exception e) {
  36. 47 e.printStackTrace();
  37. 48 }
  38. 49 }
  39. 50 }).start();
  40. 51 }
  41. 52 barrier.await();
  42. 53 System.out.println("加锁后,a="+a);
  43. 54 }
  44. 55 /**56 * 没有同步措施的a++
  45. 57 * @return58 */
  46. 59 publicstaticvoidincrement1(){
  47. 60 a++;
  48. 61 }
  49. 62 /**63 * 使用自定义的Mutex进行同步处理的a++
  50. 64 */
  51. 65 publicstaticvoidincrement2(){
  52. 66 mutex.lock();
  53. 67 a++;
  54. 68 mutex.unlock();
  55. 69 }
  56. 70}
复制代码
TestMutex
测试结果:
  1. <strong>加锁前,a=279204加锁后,a=300000</strong>
复制代码

源码分析
  我们先来简单描述下AQS的基本实现,前面我们提到过,AQS维护一个共享资源state,通过内置的FIFO来完成获取资源线程的排队工作。(这个内置的同步队列称为"CLH"队列)。该队列由一个一个的Node结点组成,每个Node结点维护一个prev引用和next引用,分别指向自己的前驱和后继结点。AQS维护两个指针,分别指向队列头部head和尾部tail。   其实就是个双端双向链表
  当线程获取资源失败(比如tryAcquire时试图设置state状态失败),会被构造成一个结点加入CLH队列中,同时当前线程会被阻塞在队列中(通过LockSupport.park实现,其实是等待态)。当持有同步状态的线程释放同步状态时,会唤醒后继结点,然后此结点线程继续加入到对同步状态的争夺中。  
Node结点
  Node结点是AbstractQueuedSynchronizer中的一个静态内部类,我们捡Node的几个重要属性来说一下
  1. 1
  2. staticfinalclassNode {
  3. 2 /**waitStatus值,表示线程已被取消(等待超时或者被中断)*/
  4. 3 staticfinalintCANCELLED = 1;
  5. 4 /**waitStatus值,表示后继线程需要被唤醒(unpaking)*/
  6. 5 staticfinalintSIGNAL = -1;
  7. 6 /**waitStatus值,表示结点线程等待在condition上,当被signal后,会从等待队列转移到同步到队列中 */
  8. 7 /**waitStatus value to indicate thread is waiting on condition */
  9. 8 staticfinalintCONDITION = -2;
  10. 9 /**waitStatus值,表示下一次共享式同步状态会被无条件地传播下去
  11. 10 static final int PROPAGATE = -3;
  12. 11 /** 等待状态,初始为0 */
  13. 12 volatileintwaitStatus;
  14. 13 /**当前结点的前驱结点 */
  15. 14 volatileNode prev;
  16. 15 /**当前结点的后继结点 */
  17. 16 volatileNode next;
  18. 17 /**与当前结点关联的排队中的线程 */
  19. 18 volatileThread thread;
  20. 19 /**...... */
  21. 20 }
复制代码

独占式
  获取同步状态--acquire()  
来看看acquire方法,lock方法一般会直接代理到acquire上
  1. 1publicfinalvoidacquire(intarg) {
  2. 2 if(!tryAcquire(arg) & &
  3. 3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. 4 selfInterrupt();
  5. 5 }
复制代码
  我们来简单理一下代码逻辑:    a.首先,调用使用者重写的tryAcquire方法,若返回true,意味着获取同步状态成功,后面的逻辑不再执行;若返回false,也就是获取同步状态失败,进入b步骤;    b.此时,获取同步状态失败,构造独占式同步结点,通过addWatiter将此结点添加到同步队列的尾部(此时可能会有多个线程结点试图加入同步队列尾部,需要以线程安全的方 式添加);    c.该结点以在队列中尝试获取同步状态,若获取不到,则阻塞结点线程,直到被前驱结点唤醒或者被中断。
  addWaiter
    为获取同步状态失败的线程,构造成一个Node结点,添加到同步队列尾部
  1. privateNode addWaiter(Node mode) {
  2. Node node = newNode(Thread.currentThread(), mode);//构造结点//指向尾结点tail Node pred =tail;
  3. //如果尾结点不为空,CAS快速尝试在尾部添加,若CAS设置成功,返回;否则,eng。 if(pred != null) {
  4. node.prev =pred;
  5. if(compareAndSetTail(pred, node)) {
  6. pred.next =node;
  7. returnnode;
  8. }
  9. }
  10. enq(node);
  11. returnnode;
  12. }
复制代码
  先cas快速设置,若失败,进入enq方法  
  将结点添加到同步队列尾部这个操作,同时可能会有多个线程尝试添加到尾部,是非线程安全的操作。
  以上代码可以看出,使用了compareAndSetTail这个cas操作保证安全添加尾结点。  enq方法
  1. privateNode enq(finalNode node) {
  2. for(;;) {
  3. Node t =tail;
  4. if(t == null) { //如果队列为空,创建结点,同时被head和tail引用 if(compareAndSetHead(newNode())) tail =head;
  5. }else{
  6. node.prev =t;
  7. if(compareAndSetTail(t, node)) {//cas设置尾结点,不成功就一直重试 t.next =node;
  8. returnt;
  9. }
  10. }
  11. }
  12. }
复制代码
  enq内部是个死循环,通过CAS设置尾结点,不成功就一直重试。很经典的CAS自旋的用法,我们在之前关于原子类的源码分析中也提到过。这是一种乐观的并发策略
  最后,看下acquireQueued方法
  acquireQueued
  1. finalbooleanacquireQueued(finalNode node, intarg) {
  2. booleanfailed = true;
  3. try{
  4. booleaninterrupted = false;
  5. for(;;) {//死循环 finalNode p = node.predecessor();//找到当前结点的前驱结点 if(p == head & & tryAcquire(arg)) {//如果前驱结点是头结点,才tryAcquire,其他结点是没有机会tryAcquire的。 setHead(node);//获取同步状态成功,将当前结点设置为头结点。 p.next = null; //方便GC failed = false;
  6. returninterrupted;
  7. }
  8. //如果没有获取到同步状态,通过shouldParkAfterFailedAcquire判断是否应该阻塞,parkAndCheckInterrupt用来阻塞线程 if(shouldParkAfterFailedAcquire(p, node) & &
  9. parkAndCheckInterrupt()) interrupted = true;
  10. }
  11. }finally{
  12. if(failed) cancelAcquire(node);
  13. }
  14. }
复制代码
  acquireQueued内部也是一个死循环,只有前驱结点是头结点的结点,也就是老二结点,才有机会去tryAcquire;若tryAcquire成功,表示获取同步状态成功,将此结点设置为头结点;若是非老二结点,或者tryAcquire失败,则进入shouldParkAfterFailedAcquire去判断判断当前线程是否应该阻塞,若可以,调用parkAndCheckInterrupt阻塞当前线程,直到被中断或者被前驱结点唤醒。若还不能休息,继续循环。 shouldParkAfterFailedAcquire
  1. shouldParkAfterFailedAcquire用来判断当前结点线程是否能休息
复制代码
  1. privatestaticbooleanshouldParkAfterFailedAcquire(Node pred, Node node) {
  2. //获取前驱结点的wait值  intws =pred.waitStatus;
  3. if(ws == Node.SIGNAL)//若前驱结点的状态是SIGNAL,意味着当前结点可以被安全地park returntrue;
  4. if(ws > 0) {
  5. //ws >0,只有CANCEL状态ws才大于0。若前驱结点处于CANCEL状态,也就是此结点线程已经无效,从后往前遍历,找到一个非CANCEL状态的结点,将自己设置为它的后继结点 do{
  6. node.prev = pred =pred.prev;
  7. }while(pred.waitStatus > 0);
  8. pred.next =node;
  9. }else{
  10. //若前驱结点为其他状态,将其设置为SIGNAL状态 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  11. }
  12. returnfalse
  13. ;
  14. }
复制代码
  若shouldParkAfterFailedAcquire返回true,也就是当前结点的前驱结点为SIGNAL状态,则意味着当前结点可以放心休息,进入parking状态了。parkAncCheckInterrupt阻塞线程并处理中断。
  1. privatefinalbooleanparkAndCheckInterrupt() {
  2. LockSupport.park(this);//使用LockSupport使线程进入阻塞状态 returnThread.interrupted();//线程是否被中断过 }
复制代码
  至此,关于acquire的方法源码已经分析完毕,我们来简单总结下
    a.首先tryAcquire获取同步状态,成功则直接返回;否则,进入下一环节;
    b.线程获取同步状态失败,就构造一个结点,加入同步队列中,这个过程要保证线程安全;
    c.加入队列中的结点线程进入自旋状态,若是老二结点(即前驱结点为头结点),才有机会尝试去获取同步状态;否则,当其前驱结点的状态为SIGNAL,线程便可安心休息,进入阻塞状态,直到被中断或者被前驱结点唤醒。

  释放同步状态--release()
  当前线程执行完自己的逻辑之后,需要释放同步状态,来看看release方法的逻辑
  1. publicfinalbooleanrelease(intarg) {
  2. if(tryRelease(arg)) {//调用使用者重写的tryRelease方法,若成功,唤醒其后继结点,失败则返回false Node h =head;
  3. if(h != null & & h.waitStatus != 0) unparkSuccessor(h);
  4. //唤醒后继结点 returntrue;
  5. }
  6. returnfalse;
  7. }
复制代码
  1. <strong>  unparkSuccessor:唤醒后继结点</strong>
  2.  
复制代码
  1. 1privatevoidunparkSuccessor(Node node) {
  2. 2 //获取wait状态3 intws =node.waitStatus;
  3. 4 if(ws < 0)
  4. 5 compareAndSetWaitStatus(node, ws, 0);//将等待状态waitStatus设置为初始值06 Node s = node.next;//后继结点7 if(s == null|| s.waitStatus > 0) {//若后继结点为空,或状态为CANCEL(已失效),则从后尾部往前遍历找到一个处于正常阻塞状态的结点     进行唤醒8 s = null;
  5. 9 for(Node t = tail; t != null & & t != node; t =t.prev)
  6. 10 if(t.waitStatus <= 0)
  7. 11 s =t;
  8. 12 }
  9. 13 if(s != null)
  10. 14 LockSupport.unpark(s.thread);//使用LockSupprot唤醒结点对应的线程
  11. 15 }
复制代码
  release的同步状态相对简单,需要找到头结点的后继结点进行唤醒,若后继结点为空或处于CANCEL状态,从后向前遍历找寻一个正常的结点,唤醒其对应线程。
共享式

  共享式:共享式地获取同步状态。对于独占式同步组件来讲,同一时刻只有一个线程能获取到同步状态,其他线程都得去排队等待,其待重写的尝试获取同步状态的方法tryAcquire返回值为boolean,这很容易理解;对于共享式同步组件来讲,同一时刻可以有多个线程同时获取到同步状态,这也是 \"共享 \"的意义所在。其待重写的尝试获取同步状态的方法tryAcquireShared返回值为int。
  1. protectedinttryAcquireShared(intarg) {
  2. thrownewUnsupportedOperationException();
  3. }
复制代码
  1.当返回值大于0时,表示获取同步状态成功,同时还有剩余同步状态可供其他线程获取;  2.当返回值等于0时,表示获取同步状态成功,但没有可用同步状态了;  3.当返回值小于0时,表示获取同步状态失败。
  获取同步状态--acquireShared  
  1. publicfinalvoidacquireShared(intarg) {
  2. if(tryAcquireShared(arg) < 0)//返回值小于0,获取同步状态失败,排队去;获取同步状态成功,直接返回去干自己的事儿。 doAcquireShared(arg);
  3. }
复制代码
  doAcquireShared
  1. 1privatevoiddoAcquireShared(intarg) {
  2. 2 finalNode node = addWaiter(Node.SHARED);//构造一个共享结点,添加到同步队列尾部。若队列初始为空,先添加一个无意义的傀儡结点,再将新节点添加到队列尾部。3 booleanfailed = true;//是否获取成功4 try{
  3. 5 booleaninterrupted = false;//线程parking过程中是否被中断过6 for(;;) {//死循环7 finalNode p = node.predecessor();//找到前驱结点8 if(p == head) {//头结点持有同步状态,只有前驱是头结点,才有机会尝试获取同步状态9 intr = tryAcquireShared(arg);//尝试获取同步装填10 if(r >= 0) {//r >=0,获取成功11 setHeadAndPropagate(node, r);//获取成功就将当前结点设置为头结点,若还有可用资源,传播下去,也就是继续唤醒后继结点12 p.next = null; //方便GC13 if(interrupted)
  4. 14 selfInterrupt();
  5. 15 failed = false;
  6. 16 return;
  7. 17 }
  8. 18 }
  9. 19 if(shouldParkAfterFailedAcquire(p, node) & &//是否能安心进入parking状态20 parkAndCheckInterrupt())//阻塞线程21 interrupted = true;
  10. 22 }
  11. 23 } finally{
  12. 24 if(failed)
  13. 25 cancelAcquire(node);
  14. 26 }
  15. 27 }
复制代码
  大体逻辑与独占式的acquireQueued差距不大,只不过由于是共享式,会有多个线程同时获取到线程,也可能同时释放线程,空出很多同步状态,所以当排队中的老二获取到同步状态,如果还有可用资源,会继续传播下去。
  setHeadAndPropagate
  1. privatevoidsetHeadAndPropagate(Node node, intpropagate) {
  2. Node h = head; //Record old head for check below setHead(node);
  3. if(propagate > 0 || h == null|| h.waitStatus < 0) {
  4. Node s =node.next;
  5. if(s == null||s.isShared()) doReleaseShared();
  6. }
  7. }
复制代码
  释放同步状态--releaseShared
  1. publicfinalbooleanreleaseShared(intarg) {
  2. if(tryReleaseShared(arg)) {
  3. doReleaseShared();//释放同步状态
  4. returntrue;
  5. }
  6. returnfalse;
  7. }
复制代码
  doReleaseShared
  1. privatevoiddoReleaseShared() {
  2. for(;;) {//死循环,共享模式,持有同步状态的线程可能有多个,采用循环CAS保证线程安全 Node h =head;
  3. if(h != null & & h !=tail) {
  4. intws =h.waitStatus;
  5. if(ws ==Node.SIGNAL) {
  6. if(!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; unparkSuccessor(h);//唤醒后继结点 }
  7. elseif(ws == 0 & &
  8. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; }
  9. if(h == head)  break;
  10. }
  11. }
复制代码
  代码逻辑比较容易理解,需要注意的是,共享模式,释放同步状态也是多线程的,此处采用了CAS自旋来保证。
总结
  关于AQS的介绍及源码分析到此为止了。
  AQS是JUC中很多同步组件的构建基础,简单来讲,它内部实现主要是状态变量state和一个FIFO队列来完成,同步队列的头结点是当前获取到同步状态的结点,获取同步状态state失败的线程,会被构造成一个结点(或共享式或独占式)加入到同步队列尾部(采用自旋CAS来保证此操作的线程安全),随后线程会阻塞;释放时唤醒头结点的后继结点,使其加入对同步状态的争夺中。
  AQS为我们定义好了顶层的处理实现逻辑,我们在使用AQS构建符合我们需求的同步组件时,只需重写tryAcquire,tryAcquireShared,tryRelease,tryReleaseShared几个方法,来决定同步状态的释放和获取即可,至于背后复杂的线程排队,线程阻塞/唤醒,如何保证线程安全,都由AQS为我们完成了,这也是非常典型的模板方法的应用。AQS定义好顶级逻辑的骨架,并提取出公用的线程入队列/出队列,阻塞/唤醒等一系列复杂逻辑的实现,将部分简单的可由使用者决定的操作逻辑延迟到子类中去实现。 
该用户未在地球留下任何的痕迹

本版积分规则

QQ|小黑屋|帮助|IT技术擎 ( 沪ICP备15054863号  

GMT+8, 2018-10-18 11:19

Powered by Discuz! X3.2 Licensed

© 2001-2013 Comsenz Inc.

快速回复 返回顶部 返回列表