线程的三个同步器

栏目: IT技术 · 发布时间: 3年前

内容简介:不知不觉就遇到了线程同步器问题,查了资料写下了总结日常中会有开启多个线程去并发执行任务,而join()方法不够灵活,现在JDK提供了

不知不觉就遇到了线程同步器问题,查了资料写下了总结

1. CountDownLatch

日常中会有开启多个线程去并发执行任务,而 主线程要等所有子线程执行完之后才能运行的需求 。之前我们是使用Thread.join方法来实现的,过程如下:

public static void main(String[] args) throws InterruptedException {

    Thread t1 = new Thread( () -> {
        try {
            Thread.sleep(1000);
            System.out.println("t1 over");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });

    Thread t2 = new Thread( () -> {
        try {
            Thread.sleep(2000);
            System.out.println("t2 over");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });

    t1.start();
    t2.start();
    
    t1.join();
    t2.join();

    System.out.println("mian over");
}
t1 over
t2 over
mian over

join()方法不够灵活,现在JDK提供了 CountDownLatch 这个类来实现所需功能

private static CountDownLatch countDownLatch = new CountDownLatch(2);

public static void main(String[] args) throws InterruptedException {

    ExecutorService t = Executors.newCachedThreadPool();

    Runnable r1 = () -> {
        try {
            System.out.println("r1 sleep");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            countDownLatch.countDown();
        }
    };

    Runnable r2 = () -> {
        try {
            System.out.println("r2 sleep");
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            countDownLatch.countDown();
        }
    };

    t.submit(r1);
    t.submit(r2);

    System.out.println("main wait");
    countDownLatch.await();
    System.out.println("main over");
}
main wait
r1 sleep
r2 sleep
main over

CountDownLatch流程:

  • 新建CountDownLatch实例,传入计数器次数
  • 主线程调用CountDownLatch.await()方法后会被阻塞
  • 子线程中在某处调用CountDownLatch.countDown()方法可使内部计数器减1
  • 当计数器变成0时,主线程的await()方法才会返回

CountDownLatch优点:

  • 调用Thread.join()调用线程会被阻塞至子线程运行完毕,而CountDownLatch.countDown()可在线程运行中执行
  • 使用线程池时是提交任务的,而没有接触到线程无法使用线程方法,那么countDown()可加在Runnable中执行

CountDownLatch原理:

内部维护了一个计数器,当计数器为0就放行,源码就不放了,熟悉AQS的同学想想就知道怎么回事

  • 继承了AQS,其实就是用AQS的state来表示计数器
  • await()方法内部有acquireSharedInterruptibly(),后者调用了重写tryaquireShared()其实就是判断计数器是否为0,不为0则阻塞进AQS队列
  • countDown()方法内部有releaseShared(),后者调用了重写tryReleaseShared()计数器减一,若为0,则唤醒阻塞线程

2. CyclicBarrier

满足多个线程都到达同一个位置后才全部开始运行的需求。CountDownLatch是一次性使用的,计数器为0后再次调用会直接返回,此时升级版的CyclicBarrier来了,其一可以满足计数器重置功能,且二还可以让一组线程达到一个状态后再全部同时执行

场景要求:假设一个任务分为3个阶段,每个线程要串行地从低阶段执行到高阶段

private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, 
                        () -> System.out.println("一个阶段完成"));

public static void main(String[] args) throws InterruptedException {

    ExecutorService service = Executors.newCachedThreadPool();

    Runnable r1 = () -> {
        try {
            System.out.println(Thread.currentThread() + "Step1");
            cyclicBarrier.await();

            System.out.println(Thread.currentThread() + "Step2");
            cyclicBarrier.await();

            System.out.println(Thread.currentThread() + "Step3");

        } catch (Exception e) {
            e.printStackTrace();
        }
    };

    Runnable r2 = () -> {
        try {
            System.out.println(Thread.currentThread() + "Step1");
            cyclicBarrier.await();

            System.out.println(Thread.currentThread() + "Step2");
            cyclicBarrier.await();

            System.out.println(Thread.currentThread() + "Step3");

        } catch (Exception e) {
            e.printStackTrace();
        }
    };

    service.submit(r1);
    service.submit(r2);

    service.shutdown();
}
Thread[pool-1-thread-1,5,main]Step1
Thread[pool-1-thread-2,5,main]Step1
一个阶段完成
Thread[pool-1-thread-1,5,main]Step2
Thread[pool-1-thread-2,5,main]Step2
一个阶段完成
Thread[pool-1-thread-1,5,main]Step3
Thread[pool-1-thread-2,5,main]Step3

CyclicBarrier的流程

  • 和上面差不多就不一一解释了
  • CyclicBarrier的构造方法中,第一个参数为计数器次数,第二个为阶段结束后要执行的方法

CyclicBarrier的原理

  • 基于独占锁,底层是AQS实现,独占锁可以原子性改变计数器,以及条件队列阻塞线程来实现线程同步
  • 内部有parties和count变量,实现重置功能
  • await()方法内调用dowait()方法
    • 获取锁更新次数减一
    • 没有为0,阻塞当前线程加入条件队列
    • 为0执行屏蔽点任务,然后唤醒条件队列的全部线程

3. Semaphore

不同与前两者,Semaphore信号量内部计数器是递增的,在需要同步的地方调用acquire指定需要同步的个数即可

private static Semaphore semaphore = new Semaphore(0);

public static void main(String[] args) throws InterruptedException {

    ExecutorService service = Executors.newCachedThreadPool();

    Runnable r1 = () -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread() + "over");
        semaphore.release();;
    };

    Runnable r2 = () -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread() + "over");
        semaphore.release();
    };

    service.submit(r1);
    service.submit(r2);

    semaphore.acquire(2);
    System.out.println("All child thread over");

    service.shutdown();
}

Semaphore的流程

  • Semaphore的构造函数传参复制当前计数器的值
  • 每个线程内部调用release()即计数器加1
  • 主线程调用acquire()方法传参为2 ,会被阻塞至计数器到达2

Semaphore的原理

  • 底层还是使用AQS,提供了公平与非公平,也是用state表示次数
  • acquire()方法获取一个信号量,并且state减一
    • 若为0,直接返回
    • 不为0当前线程会被加入AQS阻塞队列
  • release()方法,把当前Semaphore的信号量加1,然后会选择一个信号量满足的线程进行激活
  • 内部还实现了公平与非公平策略

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

比特币

比特币

李钧、长铗 / 中信出版社 / 2014-1-1 / 39.00元

2009年,比特币诞生。比特币是一种通过密码编码,在复杂算法的大量计算下产生的电子货币。虽然是虚拟货币,比特币却引起了前所未有的全球关注热潮。 这一串凝结着加密算法与运算能力的数字不仅可以安全流通、换取实物,1比特币价值甚至曾高达8 000元人民币。有研究者认为比特币具备打破几千年来全球货币由国家垄断发行的可能性。在不经意间,比特币引起的金融新浪潮已悄然成型。 虚拟货币并不是新鲜事物,......一起来看看 《比特币》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具