JUC包是Java标准库的一个包java.util.concurrent,是Java 中处理并发编程的核心工具包,JDK 5 及以上版本引入,封装了大量线程安全的组件,简化了多线程编程的复杂度,避免手动处理锁、线程管理等底层细节。

一、Callable接口

我们先引入一个场景,现在我想实现一个程序,一个线程做累加操作,另一个线程等待累加结束后打印结果

public class Demo_1100 {
    public static void main(String[] args) throws InterruptedException {
        Result result = new Result();
        // 创建一个线程进行累加操作
        Thread t = new Thread() {
            @Override
            public void run() {
                int sum = 0;
                for (int i = 1; i <= 1000; i++) {
                    // 执行累加操作
                    sum += i;
                }
                // 为结果赋值
                result.sum = sum;
                // 唤醒等待的线程
                synchronized (result.lock) {
                    result.lock.notify();
                }
            }
        };
        // 启动线程
        t.start();

        // 要主线程中获取结果,这时线程t还没有计算完成
        synchronized (result.lock) {
            // 检查累加是否执行完成
            while (result.sum == 0) {
                // 没有累加完成,等待结果
                result.lock.wait();
            }
            // 打印结果
            System.out.println(result.sum);
        }
    }
}

// 中间变量,进行保存累加结果
class Result {
    // 累加和
    public int sum = 0;
    // 锁对象
    public Object lock = new Object();
}

主线程要获取结果,此时线程t还没有完成累加操作,主线程获取锁并且阻塞等待释放锁,等待线程t结束累加并且调用notify操作之后,主线程被唤醒才继续执行打印累加结果,一个简单的操作消耗了这么多资源,有没有什么简单的方法可以获取t线程的结果呢?

1.1 Callable的概念

Callable是⼀个interface。相当于把线程封装了⼀个"返回值"。⽅便程序猿借助多线程的⽅式计算结果,我们来看一下Callable接口的源码

可以看到Callable接口中只有一个call()方法,是不是感到有些熟悉,我们之前学习Runnable接口的时候提到过,Runnable接口中只有一个run()方法,我们称这种只有一个方法的接口为函数式接口,可以使用lambda表达式简化表达

两个方法对比,run方法的返回值是void,但是call是有返回值的,返回值是传入的参数,而且call方法是可以抛出异常给调用者的

Runnable接口可以重写run方法设定线程任务,但是没有返回值

Callable接口可以重写call方法设定线程任务,这个任务是可以有返回值的

1.2 Callable接口的使用

此时我们使用Callable接口来再次完成上文提到的打印结果的任务

public class Demo_1101_Callable {
    public static void main(String[] args) {
        // 实现Callable接口,定义任务
        Callable<Integer> callable  = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("执行运算...");
                int sum = 0;
                // 执行累加操作
                for (int i = 1; i <= 1000; i++) {
                    sum += i;
                }
                System.out.println("执运算完成...");
                return sum;
            }
        };

        // Callable要配合FutureTask一起使用,FutureTask用来获取Callable的执行结果
        FutureTask<Integer> futureTask = new FutureTask<>(callable);
        // FutureTask当做构造参数传入到Thread构造方法中
        Thread thread = new Thread(futureTask);
        // 启动线程
        thread.start();

        try {
            // 等待结果, 的时间可能被中断,会抛出InterruptedException
            Integer result = futureTask.get();
            // 打印结果
            System.out.println("执行结果是:" + result);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
            // 打印异常信息
            System.out.println("打印日志:" + e.getMessage());
        }


    }
}

结果显示正确,我们在call方法实现了代码逻辑之后返回sum值,那么我们该如何启动这个任务,代码中我们能看到这么一段

// Callable要配合FutureTask一起使用,FutureTask用来获取Callable的执行结果
FutureTask<Integer> futureTask = new FutureTask<>(callable);
// FutureTask当做构造参数传入到Thread构造方法中
Thread thread = new Thread(futureTask);
// 启动线程
thread.start();

我们把callable传入了FutureTask类,这是一个什么类,我们来观察源码

可以看到FutureTask类接入了RunnableFuture接口,这个接口又是继承了Runnable接口的,这意味着FutureTask有着可被线程执行的特性,也就是可以直接提交给Thread执行。

除了提交任务给线程这个功能,FutureTask可以获取Callable的返回结果,调用get方法之后,会阻塞等待,直到任务被执行完成之后返回结果。

1.3 Runnable接口和Callable接口的相同与不同

  • Runnable接口要实现run方法,没有返回值;Callable接口要实现call方法,有返回值
  • Callable的call方法可以抛出异常,Runnable的run方法不能抛出异常
  • Callable接口要配合FutureTask类一起使用,之后传入给Thread类,调用get方法获取结果;Runnable接口可以直接传入Thread类,执行任务
  • 两个接口都是用来表述线程任务的接口

二、ReentrantLock锁

2.1 ReentrantLock的概念

ReentrantLock 是 JUC包中提供的可重入互斥,具备更灵活的锁控制能力,是处理多线程同步的核心工具之一。和synchronized锁定位类似,都是用来实现互斥效果,保证线程安全

2.2 ReentrantLock的使用

public class Demo_1102 {
    public static void main(String[] args) throws InterruptedException {
        // 创建一个ReentrantLock的对象
        ReentrantLock lock = new ReentrantLock();

        // 加锁
        lock.lock();
        // 被锁定的代码

        // 释放锁
        lock.unlock();

        // 尝试加锁,如果能获取到锁直接返回true,执行加锁的逻辑,如果获取不到锁返回false,执行其他的逻辑
        lock.tryLock();

        // 尝试加锁,并指定等待时间
        lock.tryLock(1, TimeUnit.SECONDS);
    }
}

使用可重入锁需要先创建一个ReentrantLock对象,用这个对象调用加锁解锁方法,ReentrantLock还有尝试加锁并且指定时间等方法

方法 用法
lock() 加锁,如果获取不到锁就死等
trylock() 尝试加锁,如果能获取到锁直接返回true,执行加锁的逻辑,如果获取不到锁返回false,直接退出
trylock(超时时间,时间单位) 尝试加锁,获取到锁就返回true,如果获取不到锁并且超时返回false
unlock() 释放锁

如果在lock方法和unlock方法之间,业务代码执行一半代码抛出异常了,此时释放锁的代码就无法执行了,该如何避免这种情况

使用try-finally包围,把unlock解锁方法放到finally代码块中,这样不管是否会抛出异常,最后都会执行unlock解锁方法

public class Demo_1103 {
    public static void main(String[] args) throws Exception {
        // 初始化一个锁
        ReentrantLock lock = new ReentrantLock();
        try {
            // 开始执行业务代码之前先上锁
            lock.lock();
            System.out.println("业务代码执行中....");
            TimeUnit.SECONDS.sleep(3);
            throw new Exception("执行出现异常");
        } finally {
            // 无论任何时候都可以释放锁
            lock.unlock();
            System.out.println("锁已释放");
        }
    }
}

观察代码结果,我们抛出异常之后还是可以释放锁

2.3 公平锁

我们可以用ReentrantLock创建一个公平锁

我们可以传入一个布尔型参数,如果为true则是公平锁,如果为false就是一个非公平锁

2.4 读写锁

JUC包中还有读写锁ReentrantReadWriteLock

public class Demo_1106 {
    public static void main(String[] args) {
        // 创建一个读写锁
        ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();

        // 获取读锁
        ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
        // 获取写锁
        ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();

        // 读锁加锁,共享锁,多个读锁可以共存
        readLock.lock();
        // 读锁解锁
        readLock.unlock();

        // 写锁加锁,排他锁,多个锁不能共存
        writeLock.lock();
        // 写锁解锁
        writeLock.unlock();

    }
}

我们来测试一下读锁和写锁的实现效果

public class Demo_1107 {
    public static void main(String[] args) throws InterruptedException {
        Counter1107 counter = new Counter1107();

        // 创建两个线程对一个变量进时累加
        Thread t1 = new Thread(() -> {
            // 5万次
            for (int i = 0; i < 50000; i++) {
                // 加读锁
                counter.increase();
            }
        });
        // 线程2
        Thread t2 = new Thread(() -> {
            // 5万次
            for (int i = 0; i < 50000; i++) {
                // 加读锁
                counter.increase();
            }
        });

        // 启动线程
        t1.start();
        t2.start();

        // 等待线程执行完成
        t1.join();
        t2.join();

        // 查看运行结果
        System.out.println("count = " + counter.count);
    }
}

class Counter1107 {
    // 多线程环境中修改变量加volatile
    public volatile int count = 0;

    // 定义一个读写锁对象
    ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    /**
     * 加读锁
     * 存在线程安全问题
     *
     */
    public void increase() {

        // 读锁对象
        ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();

        try {
            // 加读锁
            readLock.lock();
            count++;
        } finally {
            // 释放锁
            readLock.unlock();
        }
    }
}

可以看到结果不是预期结果,加读锁进行多线程修改共享变量的时候出现了线程安全问题,说明读锁是可以共存的,多个读锁并不能保证原子性

public class Demo_1107 {
    public static void main(String[] args) throws InterruptedException {
        Counter1107 counter = new Counter1107();

        // 创建两个线程对一个变量进时累加
        Thread t1 = new Thread(() -> {
            // 5万次
            for (int i = 0; i < 50000; i++) {
                // 加写锁
                counter.increase1();
            }
        });

        // 线程2
        Thread t2 = new Thread(() -> {
            // 5万次
            for (int i = 0; i < 50000; i++) {
                // 加写锁
                counter.increase1();
            }
        });

        // 启动线程
        t1.start();
        t2.start();

        // 等待线程执行完成
        t1.join();
        t2.join();

        // 查看运行结果
        System.out.println("count = " + counter.count);
    }
}

class Counter1107 {
    // 多线程环境中修改变量加volatile
    public volatile int count = 0;

    // 定义一个读写锁对象
    ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    /**
     * 加写锁
     */
    public void increase1() {
        // 写锁对象
        ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
        try {
            // 加写锁
            writeLock.lock();
            count++;
        } finally {
            // 释放锁
            writeLock.unlock();
        }
    }


}

我们这次加了两个线程都调用了加写锁的方法,结果就是预期结果了,说明写锁是不可以共存的,没有出现线程安全问题

public class Demo_1107 {
    public static void main(String[] args) throws InterruptedException {
        Counter1107 counter = new Counter1107();

        // 创建两个线程对一个变量进时累加
        Thread t1 = new Thread(() -> {
            // 5万次
            for (int i = 0; i < 50000; i++) {
                // 加读锁
                counter.increase();
            }
        });

        // 线程2
        Thread t2 = new Thread(() -> {
            // 5万次
            for (int i = 0; i < 50000; i++) {
                // 加写锁
                counter.increase1();
            }
        });

        // 启动线程
        t1.start();
        t2.start();

        // 等待线程执行完成
        t1.join();
        t2.join();

        // 查看运行结果
        System.out.println("count = " + counter.count);
    }
}

class Counter1107 {
    // 多线程环境中修改变量加volatile
    public volatile int count = 0;

    // 定义一个读写锁对象
    ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    /**
     * 加读锁
     * 存在线程安全问题
     *
     */
    public void increase() {

        // 读锁对象
        ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();

        try {
            // 加读锁
            readLock.lock();
            count++;
        } finally {
            // 释放锁
            readLock.unlock();
        }
    }

    /**
     * 加写锁
     */
    public void increase1() {
        // 写锁对象
        ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();

        try {
            // 加写锁
            writeLock.lock();
            count++;
        } finally {
            // 释放锁
            writeLock.unlock();
        }
    }


}

我们这次一个线程加读锁,一个线程加写锁,再次验证结果,结果还是正确的,说明了写锁和读锁也是不可共存的,满足原子性

2.5 休眠和唤醒

使用ReentrantLock我们可以针对性的对一些线程进行阻塞和唤醒

我们拿男女厕所的场景来说,当男厕所满了之后可以对男厕所外排队的人进行阻塞等待,等有人出来之后可以只唤醒男厕所排队的人,女厕所同理。

在代码中我们通过一个Condition类来实现

public class Demo_1108 {
    public static void main(String[] args) throws InterruptedException {
        // 定义一把锁
        ReentrantLock lock = new ReentrantLock();
        
        // 定义很多个休眠与唤醒条件
        // 条件1,维护男厕所的线程队列
        Condition male = lock.newCondition();
        // 条件2,维护女厕所的线程队列
        Condition female = lock.newCondition();
    }
}

通过Condition就可以实现有条件的让线程等待或唤醒,我们来看一下Condition类的源码

ConditionObject 是 Condition 的默认实现,内部维护条件队列的头尾节点(firstWaiter 和lastWaiter)

👉 结构:条件队列是一个单向链表,由 ConditionNode 节点组成,每个节点对应一个等待线程

ConditionNode 继承自 AQS 的 Node 类,新增 nextWaiter 字段用于维护单向链表的节点顺序

AQS 的抽象 Node 类,定义了节点的前驱 / 后继指针(prev/next)、等待线程(waiter)和节点状态(status)

简单来说就是一个 Condition 实例对应维护一个独立的线程等待队列,所以我们可以针对性的进行唤醒和等待

        Condition male = lock.newCondition();
        // 条件2
        Condition female = lock.newCondition();
        // 根据不同的条件进行阻塞等待
        male.await();
        // 根据不同的条件进行唤醒
        male.signal();      // 唤醒相应队列中的一个线程
        male.signalAll();   // 唤醒相应队列中的所有线程


        // 根据不同的条件进行阻塞等待
        female.await();
        // 根据不同的条件进行唤醒
        female.signal();
        female.signalAll();

await()是线程阻塞等待操作,signal()是唤醒相应队列中的一个线程,signalAll()是唤醒相应队列中的所有线程

我们拿就餐排队为例给出测试代码来测试方法的使用

public class Demo_1108 {
    public static void main(String[] args) throws InterruptedException {
        ReentrantLock lock = new ReentrantLock();
        // 男性等待队列
        Condition male = lock.newCondition();
        // 女性等待队列
        Condition female = lock.newCondition();

        // 创建3个男性线程(等待就餐)
        for (int i = 1; i <= 3; i++) {
            new Thread(() -> {
                lock.lock();
                try {
                    System.out.println(Thread.currentThread().getName() + ":等待就餐(进入男性队列)");
                    male.await(); // 进入男性条件队列等待
                    System.out.println(Thread.currentThread().getName() + ":被唤醒,开始就餐");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }, "男性线程" + i).start();
        }

        // 创建2个女性线程(等待就餐)
        for (int i = 1; i <= 2; i++) {
            new Thread(() -> {
                lock.lock();
                try {
                    System.out.println(Thread.currentThread().getName() + ":等待就餐(进入女性队列)");
                    female.await(); // 进入女性条件队列等待
                    System.out.println(Thread.currentThread().getName() + ":被唤醒,开始就餐");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }, "女性线程" + i).start();
        }

        // 主线程休眠1秒,确保所有线程都进入等待状态
        Thread.sleep(1000);

        // 唤醒操作(必须在锁内执行)
        lock.lock();
        try {
            System.out.println("\n=== 开始唤醒 ===");
            male.signal(); // 唤醒1个男性线程
            Thread.sleep(500); // 观察效果
            female.signalAll(); // 唤醒所有女性线程
            Thread.sleep(500);
            male.signalAll(); // 唤醒剩余男性线程
        } finally {
            lock.unlock();
        }
    }
}

结果正确,我们正确让男生进入男生队列,女生进入女生队列,最后唤醒的时候也是针对性的唤醒不同性别的线程

2.6 ReentrantLock和synchronized的区别

  • synchronized是一个关键字,加锁解锁的过程是调用系统API;
  • ReentrantLock则是纯Java代码实现,是用户态的操作
  • synchronized使用的时候不用手动释放锁,进入修饰的代码块就是加锁,退出代码块就释放锁;
  • ReentrantLock使用的时候需要调用lock和unlock方法手动释放锁,使用更灵活,但是容易遗忘unlock操作,并且要放入finally代码块中
  • synchronized申请锁失败之后会一直阻塞等待,直到别的线程释放锁之后再去竞争;ReentrantLock可以通过trylock方法等待一段时间就会放弃,获取不到锁之后可以去执行其他逻辑代码
  • synchronized是非公平锁;
  • ReentrantLock则默认是非公平锁,可以通过构造方法传入true创建一个公平锁
  • synchronized是通过Object类的wait/notify/notifyAll方法实现等待唤醒
  • ReentrantLock则是要搭配Condition类调用await/signal/signalAll方法实现精确控制线程等待-唤醒
Logo

汇聚全球AI编程工具,助力开发者即刻编程。

更多推荐