栏目分类:
子分类:
返回
终身学习网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
终身学习网 > IT > 软件开发 > 后端开发 > Java

AbstractQueuedSynchronizer

Java 更新时间:发布时间: 百科书网 趣学号

早期程序员会自己通过一种同步器去实现另一种相近的同步器,例如用可重入锁去实现信号量,或反之。这显然不够优雅,于是在JDK1.5中新增了 AQS,提供了这种通用的同步器机制。

是什么

抽象的队列同步器

AbstractOwnableSynchronizer
AbstractQueuedLongSynchronizer
AbstractQueuedSynchronizer

通常地:AbstractQueuedSynchronizer简称为AQS

是用来构建锁或者其它同步器组件的重量级基础框架及整个JUC体系的基石,通过内置的FIFO队列来完成资源获取线程的排队工作,并通过一个int类变量state表示持有锁的状态

CLH:Craig、Landin and Hagersten 队列,是一个单向链表,AQS中的队列是CLH变体的虚拟双向队列FIFO

特点:

AQS全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架

用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁

getState - 获取 state 状态
setState - 设置 state 状态

compareAndSetState - cas 机制设置 state 状态

独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet
子类主要实现这样一些方法(默认抛出 UnsupportedOperationException)

AQS为什么是JUC内容中最重要的基石

和AQS有关的

ReentrantLock

CountDownLatch

ReentrantReadWriteLock

Semaphore

进一步理解锁和同步器的关系

锁,面向锁的使用者

定义了程序员和锁交互的使用层API,隐藏了实现细节,你调用即可。

同步器,面向锁的实现者

Java并发大神DougLee,提出统一规范并简化了锁的实现,
屏蔽了同步状态管理、阻塞线程排队和通知、唤醒机制等。

AQS能干嘛

加锁会导致阻塞,有阻塞就需要排队,实现排队必然需要队列(AQS管理队列)

抢到资源的线程直接使用处理业务,抢不到资源的必然涉及一种排队等候机制。抢占资源失败的线程继续去等待(类似银行业务办理窗口都满了,暂时没有受理窗口的顾客只能去候客区排队等候),但等候线程仍然保留获取锁的可能且获取锁流程仍在继续(候客区的顾客也在等着叫号,轮到了再去受理窗口办理业务)。

既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?

如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现。它将请求共享资源的线程封装成队列的结点(Node),通过CAS、自旋以及LockSupport.park()的方式,维护state变量的状态,使并发达到同步的效果。

AQS= state+CLH队列(FIFO)

有阻塞就需要排队,实现排队必然需要队列

AQS使用一个volatile的int类型的成员变量来表示同步状态,通过内置的
FIFO队列来完成资源获取的排队工作将每条要去抢占资源的线程封装成
一个Node节点来实现锁的分配,通过CAS完成对State值的修改。

1) state 设计

state 使用 volatile 配合 cas 保证其修改时的原子性
state 使用了 32bit int 来维护同步状态,因为当时使用 long 在很多平台下测试的结果并不理想

2) 阻塞恢复设计

早期的控制线程暂停和恢复的 api 有 suspend 和 resume,但它们是不可用的,因为如果先调用的 resume那么 suspend 将感知不到
解决方法是使用 park & unpark 来实现线程的暂停和恢复,具体原理在之前讲过了,先 unpark 再 park 也没问题
park & unpark 是针对线程的,而不是针对同步器的,因此控制粒度更为精细
park 线程还可以通过 interrupt 打断

3) 队列设计–CLH队列的变体实现–双向队列

使用了 FIFO 先入先出队列,并不支持优先级队列
设计时借鉴了 CLH 队列,它是一种单向无锁队列

AQS内部体系架构

AQS中有一个Node静态内部类 用来保存等待获取锁的线程

head 、tail表示同步器头、尾节点

state 同步状态

AQS同步队列的基本结构

CLH:Craig、Landin and Hagersten 队列,是个单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO)

AQS的int变量State

AQS的同步状态 State成员变量 持有锁的状态

private volatile int state;

可以看做银行办理业务的受理窗口状态

0就是没人,空闲状态可以办理

大于等于1,就是有人占用窗口,需要排队等着

AQS的CLH队列

CLH队列(三个大牛的名字组成),为一个双向队列

可以看做银行候客区的等待顾客 排队等待

CLH 好处:
无锁,使用自旋

快速,无阻塞

AQS 在一些方面改进了 CLH

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 队列中还没有元素 tail 为 null
        if (t == null) { // Must initialize
            // 将 head 从 null -> dummy 哨兵(傀儡)节点
            if (compareAndSetHead(new Node()))
                // 将 node 的 prev 设置为原来的 tail
                tail = head;
        } else {
            node.prev = t;
            // 将 tail 从原来的 tail 设置为 node
            if (compareAndSetTail(t, node)) {
                // 原来 tail 的 next 设置为 nod
                t.next = node;
                return t;
            }
        }
    }
}
内部类Node(Node类在AQS类内部) Node类的讲解

内部结构

属性说明

Node的int变量waitState

Node的等待状态waitState成员变量

队列中每个排队的个体就是一个 Node

waitState可以看做等候区其它顾客(其它线程)的等待状态

AQS实现一个不可重入锁

自定义同步器

package com.dongguo.aqs;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;


public class MySync extends AbstractQueuedSynchronizer {
    @Override
    protected boolean tryAcquire(int arg) {
        //初始state值为0
        if (compareAndSetState(0, 1)) {
            //加锁成功,并且设置锁的持有者Owner为当前线程
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    @Override
    protected boolean tryRelease(int arg) {

        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    
    @Override
    protected boolean isHeldExclusively() {
        return getState() == 1;
    }

    public Condition newCondition() {
        return new ConditionObject();
    }
}

自定义锁
有了自定义同步器,很容易复用 AQS ,实现一个功能完备的自定义锁

package com.dongguo.aqs;


import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;


public class MyLock implements Lock {

    //独占锁
    private MySync sync = new MySync();

    
    @Override
    public void lock() {
        sync.acquire(1);
    }

    
    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    
    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }

    
    @Override
    public void unlock() {
        sync.release(1);
    }

    
    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

测试

package com.dongguo.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;


@Slf4j(topic = "d.Client")
public class Client {
    public static void main(String[] args) {
        MyLock lock = new MyLock();
        new Thread(() -> {
            lock.lock();
            try {
                log.debug("locking...");
                try {
                    //t1睡眠1s
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } finally {
                log.debug("unlocking...");
                lock.unlock();
            }
        }, "t1").start();
        new Thread(() -> {
            lock.lock();
            try {
                log.debug("locking...");
            } finally {
                log.debug("unlocking...");
                lock.unlock();
            }
        }, "t2").start();
    }
}
运行结果
14:48:04 [t1] d.Client - locking...
14:48:05 [t1] d.Client - unlocking...
14:48:05 [t2] d.Client - locking...
14:48:05 [t2] d.Client - unlocking...

测试不可重入

package com.dongguo.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;


@Slf4j(topic = "d.Client")
public class Client {
    public static void main(String[] args) {
        MyLock lock = new MyLock();
        new Thread(() -> {
            lock.lock();
            log.debug("locking...");
            lock.lock();
            log.debug("locking...");
            try {
                try {
                    //t1睡眠1s
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } finally {
                log.debug("unlocking...");
                lock.unlock();
                log.debug("unlocking...");
                lock.unlock();
            }
        }, "t1").start();
    }
}

只会打印一次 locking 进入阻塞,不可重入

转载请注明:文章转载自 www.051e.com
本文地址:http://www.051e.com/it/270207.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 ©2023-2025 051e.com

ICP备案号:京ICP备12030808号