Java多线程共享模型之管程

Monitor(锁)

Monitor被翻译为监视器或管程。

Monitor是重量级锁的实现,是向操作系统申请的。

Monitor的结构:WaitSet、EntryList、Owner

运行状态:Owner指向当前线程

堵塞状态:进入EntryList,称为BLOCKED状态。当Owner解除占用之后,通知EntryList中的线程可以竞争锁。

锁优化

轻量级锁

深入理解java虚拟机第三版读书笔记13

锁膨胀

如果在尝试加轻量级锁的过程中,CAS 操作无法成功,这时一种情况就是有其它线程为此对象加上了轻量级锁(有竞争),这时需要进行锁膨胀,将轻量级锁变为重量级锁。

自旋优化

重量级锁竞争的时候,还可以使用自旋来进行优化,如果当前线程自旋成功(即这时候持锁线程已经退出了同步块,释放了锁),这时当前线程就可以避免阻塞。

  • 自旋会占用 CPU 时间,单核 CPU 自旋就是浪费,多核 CPU 自旋才能发挥优势。
  • 在 Java 6 之后自旋锁是自适应的,比如对象刚刚的一次自旋操作成功过,那么认为这次自旋成功的可能性会高,就多自旋几次;反之,就少自旋甚至不自旋,总之,比较智能。
  • Java 7 之后不能控制是否开启自旋功能

偏向锁

参考深入理解java虚拟机第三版读书笔记13

一个对象创建时:
+ 如果开启了偏向锁(默认开启),那么对象创建后,markword 值为 0x05 即最后 3 位为 101,这时它的thread、epoch、age 都为 0(代表它是可偏向的,一旦获取了它的哈希码就不可偏向了)
+ 偏向锁默认是延迟的,不会在程序启动时立即生效,如果想避免延迟,可以加 VM 参数 -XX:BiasedLockingStartupDelay=0 来禁用延迟
+ 如果没有开启偏向锁,那么对象创建后,markword 值为 0x01 即最后 3 位为 001,这时它的 hashcode、age 都为 0,第一次用到 hashcode 时才会赋值

调用 wait/notify也会撤销偏向锁:

public static void main(String[] args) throws InterruptedException {
    Dog d = new Dog();
    Thread t1 = new Thread(() -> {
        log.debug(ClassLayout.parseInstance(d).toPrintableSimple(true));
        synchronized (d) {
            log.debug(ClassLayout.parseInstance(d).toPrintableSimple(true));
            try {
                d.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug(ClassLayout.parseInstance(d).toPrintableSimple(true));
        }
    }, "t1");
    t1.start();
    new Thread(() -> {
        try {
            Thread.sleep(6000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        synchronized (d) {
            log.debug("notify");
            d.notify();
        }
    }, "t2").start();
}

批量重偏向

当一个线程创建了大量对象并执行了初始的同步操作,后来另一个线程也来将这些对象作为锁对象进行操作,会导偏向锁重偏向的操作。

如果对象虽然被多个线程访问,但没有竞争,这时偏向了线程 T1 的对象仍有机会重新偏向 T2,重偏向会重置对象的 Thread ID.

默认偏向锁批量重偏向阈值为20。

(前20个对象会升级为轻量级锁,后面20-40则重偏向)

批量撤销

在多线程竞争剧烈的情况下,使用偏向锁将会降低效率,于是乎产生了批量撤销机制。

默认偏向锁批量撤销阈值为40,如果撤销偏向锁次数超过40,不会再重偏向,全部改为不可偏向的,新建的对象也改为不可偏向的。

锁消除

参考深入理解java虚拟机第三版读书笔记13

关闭锁消除优化:-XX:-EliminateLocks

wait notify

Java多线程共享模型之管程

原理

  • Owner 线程发现条件不满足,调用 wait 方法,即可进入 WaitSet 变为 WAITING 状态
  • BLOCKED 和 WAITING 的线程都处于阻塞状态,不占用 CPU 时间片
  • BLOCKED 线程会在 Owner 线程释放锁时唤醒
  • WAITING 线程会在 Owner 线程调用 notify 或 notifyAll 时唤醒,但唤醒后并不意味者立刻获得锁,仍需进入 EntryList 重新竞争

API

  • obj.wait() 让进入 object 监视器的线程到 waitSet 等待
  • obj.wait(long millis) 等待限制的时间,超出时间进入就绪状态
  • obj.notify() 在 object 上正在 waitSet 等待的线程中随机挑一个唤醒
  • obj.notifyAll() 让 object 上正在 waitSet 等待的线程全部唤醒

它们都是线程之间进行协作的手段,都属于 Object 对象的方法。必须获得此对象的锁,才能调用这几个方法

synchronized(lock) {
    while(条件不成立) {
        lock.wait();
    }
    // 干活
}
//另一个线程
synchronized(lock) {
    lock.notifyAll();
}

sleep(long n) 和 wait(long n) 的区别

  1. sleep 是 Thread 方法,而 wait 是 Object 的方法
  2. sleep 不需要强制和 synchronized 配合使用,但 wait 需要和 synchronized 一起用
  3. sleep 在睡眠的同时,不会释放对象锁的,但 wait 在等待的时候会释放对象锁

同步模式之保护性暂停

Guarded Suspension,用在一个线程等待另一个线程的执行结果

Java多线程共享模型之管程

要点:

  • 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
  • 如果有结果不断从一个线程到另一个线程那么可以使用消息队列
  • JDK 中,join 的实现、Future 的实现,采用的就是此模式
  • 因为要等待另一方的结果,因此归类到同步模式

实现:

class GuardedObject {
    private Object response;
    private final Object lock = new Object();
    public Object get() {
        synchronized (lock) {
            // 条件不满足则等待
            while (response == null) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return response;
        }
    }
    public void complete(Object response) {
        synchronized (lock) {
            // 条件满足,通知等待线程
            this.response = response;
            lock.notifyAll();
        }
    }
}

带超时的版本:

class GuardedObjectV2 {
    private Object response;
    private final Object lock = new Object();
    public Object get(long millis) {
        synchronized (lock) {
        // 1) 记录最初时间
        long begin = System.currentTimeMillis();
        // 2) 已经经历的时间
        long timePassed = 0;
        while (response == null) {
            // 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等
            long waitTime = millis - timePassed;
            log.debug("waitTime: {}", waitTime);
            if (waitTime <= 0) {
                log.debug("break...");
                break;
            }
            try {
                lock.wait(waitTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 3) 如果提前被唤醒,这时已经经历的时间假设为 400
            timePassed = System.currentTimeMillis() - begin;
            log.debug("timePassed: {}, object is null {}",
            timePassed, response == null);
        }
        return response;
        }
    }
    public void complete(Object response) {
        synchronized (lock) {
            // 条件满足,通知等待线程
            this.response = response;
            log.debug("notify...");
            lock.notifyAll();
        }
    }
}

异步模式之生产者/消费者

Java多线程共享模型之管程

要点

  • 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
  • 消息队列可以用来平衡生产和消费的线程资源
  • 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
  • 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
  • JDK 中各种阻塞队列,采用的就是这种模式

实现

class Message {
    private int id;
    private Object message;
    public Message(int id, Object message) {
        this.id = id;
        this.message = message;
    }
    public int getId() {
        return id;
    }
    public Object getMessage() {
        return message;
    }
}

class MessageQueue {
    private LinkedList<Message> queue;
    private int capacity;
    public MessageQueue(int capacity) {
        this.capacity = capacity;
        queue = new LinkedList<>();
    }
    public Message take() {
        synchronized (queue) {
            while (queue.isEmpty()) {
                log.debug("没货了, wait");
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            Message message = queue.removeFirst();
            queue.notifyAll();
            return message;
        }
    }
    public void put(Message message) {
        synchronized (queue) {
            while (queue.size() == capacity) {
                log.debug("库存已达上限, wait");
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(message);
            queue.notifyAll();
        }   
    }
}

park unpark

基本使用

它们是 LockSupport 类中的方法

// 暂停当前线程
LockSupport.park();
// 恢复某个线程的运行
LockSupport.unpark(暂停线程对象)

先park再unpark

Thread t1 = new Thread(() -> {
    System.out.println("start...");
    try {
        Thread.sleep(1000);
        System.out.println("park...");
        LockSupport.park();
        System.out.println("resume...");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
},"t1");
t1.start();
Thread.sleep(2000);
System.out.println("unpark...");
LockSupport.unpark(t1);

可以先调用unpark再调用park,会抵消。(但是多次unpark也只能抵消一次park)

特点

与 Object 的 wait & notify 相比

  • wait,notify 和 notifyAll 必须配合 Object Monitor 一起使用,而 park,unpark 不必
  • park & unpark 是以线程为单位来【阻塞】和【唤醒】线程,而 notify 只能随机唤醒一个等待线程,notifyAll是唤醒所有等待线程,就不那么【精确】
  • park & unpark 可以先 unpark,而 wait & notify 不能先 notify

ReentrantLock

相对于 synchronized 它具备如下特点
+ 可中断
+ 可以设置超时时间
+ 可以设置为公平锁
+ 支持多个条件变量

与 synchronized 一样,都支持可重入

// 获取锁
reentrantLock.lock();
try {
    // 临界区
} finally {
    // 释放锁
    reentrantLock.unlock();
}

可打断

lockInterruptibly()方法等待锁期间可以被interrupt打断

ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
    log.debug("启动...");
    try {
        //如果该锁没有被占用就会获得锁
        //如果该锁被占用就进入阻塞队列,可以被其他线程用interrupt打断
        lock.lockInterruptibly();
    } catch (InterruptedException e) {
        e.printStackTrace();
        log.debug("等锁的过程中被打断");
        return;
    }
    try {
        log.debug("获得了锁");
    } finally {
        lock.unlock();
    }
}, "t1");
lock.lock();
log.debug("获得了锁");
t1.start();
try {
    sleep(1);
    t1.interrupt();
    log.debug("执行打断");
} finally {
    lock.unlock();
}

锁超时

tryLock()可带参可不带参,如果未指定时间,只要该锁被占用就获取不到锁,返回false,如果指定时间,在指定时间内该锁还被占用则获取不到锁,返回false

ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
    log.debug("启动...");
    if (!lock.tryLock()) {
        log.debug("获取立刻失败,返回");
        return;
    }
    try {
        log.debug("获得了锁");
    } finally {
        lock.unlock();
    }
}, "t1");
lock.lock();
log.debug("获得了锁");
t1.start();
try {
    sleep(2);
} finally {
    lock.unlock();
}

公平锁

ReentrantLock 默认是不公平的,可以使用new ReentrantLock(true)
设置为公平锁,采用先到先得的方法获得锁。(排队)

条件变量

  • Condition.await()
  • Condition.await(int millis)
  • Condition.signal()
  • Condition.signalAll()

使用要点:

  • await 前需要获得锁
  • await 执行后,会释放锁,进入 conditionObject 等待
  • await 的线程被唤醒(或打断、或超时)取重新竞争 lock 锁
  • 竞争 lock 锁成功后,从 await 后继续执行

同步模式之顺序控制

固定顺序

比如,必须先 2 后 1 打印

wait notify 版

// 用来同步的对象
static Object obj = new Object();
// t2 运行标记, 代表 t2 是否执行过
static boolean t2runed = false;
    public static void main(String[] args) {
    Thread t1 = new Thread(() -> {
        synchronized (obj) {
            // 如果 t2 没有执行过
            while (!t2runed) {
                try {
                    // t1 先等一会
                    obj.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        System.out.println(1);
    });
    Thread t2 = new Thread(() -> {
        System.out.println(2);
        synchronized (obj) {
            // 修改运行标记
            t2runed = true;
            // 通知 obj 上等待的线程(可能有多个,因此需要用 notifyAll)
            obj.notifyAll();
        }
    });
    t1.start();
    t2.start();
}

Park Unpark 版

park 和 unpark 方法比较灵活,他俩谁先调用,谁后调用无所谓。并且是以线程为单位进行『暂停』和『恢复』,不需要『同步对象』和『运行标记』

Thread t1 = new Thread(() -> {
    try { 
        Thread.sleep(1000);
        } catch (InterruptedException e) { }
    // 当没有『许可』时,当前线程暂停运行;有『许可』时,用掉这个『许可』,当前线程恢复运行
    LockSupport.park();
    System.out.println("1");
});
Thread t2 = new Thread(() -> {
    System.out.println("2");
    // 给线程 t1 发放『许可』(多次连续调用 unpark 只会发放一个『许可』)
    LockSupport.unpark(t1);
});
t1.start();
t2.start();

交替输出

leetcode1115-交替打印FooBar

除此之外知道线程对象还可以使用park、unpark。

原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/java%e5%a4%9a%e7%ba%bf%e7%a8%8b%e5%85%b1%e4%ba%ab%e6%a8%a1%e5%9e%8b%e4%b9%8b%e7%ae%a1%e7%a8%8b/

发表评论

电子邮件地址不会被公开。