Java 并发编程

Java 并发编程线程创建 线程方法 线程同步 线程池 线程状态 java 并发编程

大家好,欢迎来到IT知识分享网。

在这里插入图片描述

一、线程创建

1.1 继承 Thread 类

通过继承 Thread 类来创建线程是最简单的方法之一。只需要创建一个继承自 Thread 的子类,并重写其 run() 方法,然后通过调用子类的 start() 方法来启动线程。如果 JVM 采用 1:1 的线程模型,start() 方法底层会通过 POSIX 线程库中的 pthread_create() 创建一个内核线程,并将 Java 中创建的线程映射到这个内核线程中,不过不同的 JVM 具有不同的映射方案。

这种方法的优点是简单易用,适用于简单的线程逻辑。不过由于 Java 不支持多重继承,因此通过继承 Thread 类来创建线程会限制类的继承关系。

通过继承 Thread 类来创建线程:

package atreus.ink; import java.lang.management.ManagementFactory; public class MyThread extends Thread { 
    @Override public void run() { 
    long threadId = Thread.currentThread().getId(); String processId = ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; System.out.println("My pid is " + processId + ", my tid is " + threadId + "."); } } 
package atreus.ink; import java.lang.management.ManagementFactory; public class Main { 
    public static void main(String[] args) { 
    Thread thread = new MyThread(); thread.start(); long threadId = Thread.currentThread().getId(); String processId = ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; System.out.println("My pid is " + processId + ", my tid is " + threadId + "."); } } 
My pid is 66201, my tid is 13. My pid is 66201, my tid is 1. 

注意事项:主线程启动子线程需要调用 start() 方法而不是 run() 方法,调用 run() 方法会将线程对象当作普通的 Java 对象来进行方法调用,并不会向操作系统注册线程,实际还是单线程执行。


1.2 实现 Runnable 接口

通过实现 Runnable 接口来创建线程是更加灵活的方法。通过这种方式,一个类既可以实现其他接口,又可以创建线程。

通过实现 Runnable 接口来创建线程:

package atreus.ink; import java.lang.management.ManagementFactory; public class MyRunnable implements Runnable{ 
     @Override public void run() { 
     long threadId = Thread.currentThread().getId(); String processId = ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; System.out.println("My pid is " + processId + ", my tid is " + threadId + "."); } } 
package atreus.ink; import java.lang.management.ManagementFactory; public class Main { 
     public static void main(String[] args) { 
     Runnable runnable = new MyRunnable(); new Thread(runnable).start(); long threadId = Thread.currentThread().getId(); String processId = ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; System.out.println("My pid is " + processId + ", my tid is " + threadId + "."); } } 
My pid is 7388, my tid is 14. My pid is 7388, my tid is 1. 

此外,对于通过实现 Runnable 接口的创建方法,还可以通过匿名内部类和 Lambda 进行代码简化:

package atreus.ink; import java.lang.management.ManagementFactory; public class Main { 
     public static void main(String[] args) { 
     // 1.通过匿名内部类进行代码简化 Runnable runnable = new Runnable() { 
     @Override public void run() { 
     long threadId = Thread.currentThread().getId(); String processId = ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; System.out.println("My pid is " + processId + ", my tid is " + threadId + "."); } }; new Thread(runnable).start(); // 2.通过Lambda表达式进行简化 new Thread(() -> { 
     long threadId = Thread.currentThread().getId(); String processId = ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; System.out.println("My pid is " + processId + ", my tid is " + threadId + "."); }).start(); long threadId = Thread.currentThread().getId(); String processId = ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; System.out.println("My pid is " + processId + ", my tid is " + threadId + "."); } } 

1.3 实现 Callable 接口

当使用 Callable 接口来创建线程时,可以实现一些需要返回结果的异步操作。与使用 Runnable 不同,Callablecall() 方法可以返回一个值,也可以抛出异常。

通过实现 Callable 接口创建线程,同时获取返回值:

package atreus.ink; import java.lang.management.ManagementFactory; import java.util.concurrent.Callable; public class MyCallable implements Callable<String> { 
      @Override public String call() throws Exception { 
      long threadId = Thread.currentThread().getId(); String processId = ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; return "My pid is " + processId + ", my tid is " + threadId + "."; } } 
package atreus.ink; import java.lang.management.ManagementFactory; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; public class Main { 
      public static void main(String[] args) throws ExecutionException, InterruptedException { 
      Callable<String> callable = new MyCallable(); FutureTask<String> task = new FutureTask<>(callable); new Thread(task).start(); System.out.println(task.get()); // get方法会使主线程等待子线程执行完毕,然后获取结果 long threadId = Thread.currentThread().getId(); String processId = ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; System.out.println("My pid is " + processId + ", my tid is " + threadId + "."); } } 
My pid is 25580, my tid is 14. My pid is 25580, my tid is 1. 

当然,这种创建方法也可以通过匿名内部类和 Lambda 表达式简化。

package atreus.ink; import java.lang.management.ManagementFactory; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; public class Main { 
      public static void main(String[] args) throws ExecutionException, InterruptedException { 
      FutureTask<String> task = new FutureTask<>(() -> { 
      long threadId = Thread.currentThread().getId(); String processId = ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; return "My pid is " + processId + ", my tid is " + threadId + "."; }); new Thread(task).start(); System.out.println(task.get()); long threadId = Thread.currentThread().getId(); String processId = ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; System.out.println("My pid is " + processId + ", my tid is " + threadId + "."); } } 

二、线程方法

Thread 类提供了很多与线程操作相关的方法:

常用方法 说明
public static Thread currentThread() 获取当前执行的线程对象
public void run() 线程的任务方法
public void start() 启动线程
public String getName() 获取当前线程的名称,线程名称默认是 Thread-索引
public void setName(String name) 为线程设置名称
public static void sleep(long time) 让当前执行的线程休眠
public final void join() 调用这个方法的线程将等待被调用的线程执行完成,然后再继续执行,类似于 POSIX 线程库中的 pthread_join

join() 的基本使用示例:

public class Main { 
       public static void main(String[] args) { 
       Thread thread = new Thread(() -> { 
       System.out.println(Thread.currentThread().getName()); try { 
       Thread.sleep(3000); } catch (InterruptedException e) { 
       Thread.currentThread().interrupt(); } System.out.println(Thread.currentThread().getName()); }); thread.start(); thread.join(); // 主线程会在此处等待子线程执行完毕 System.out.println(Thread.currentThread().getName()); } } 

注意:实际上即使主线程不 join(),子线程也能完成自己的任务,因为虚拟机进程只有在虚拟机中所有存活的线程都是守护线程时才会退出,不管是主线程还是子线程本质上都是非守护线程,因此主线程的退出不会影响子线程的继续执行。但如果通过 setDaemon(true) 将子线程设置为守护线程,那么主线程结束后虚拟机会立即结束。

public class Main { 
       public static void main(String[] args) { 
       Thread thread = new Thread(() -> { 
       System.out.println(Thread.currentThread().getName()); try { 
       Thread.sleep(3000); } catch (InterruptedException e) { 
       Thread.currentThread().interrupt(); } System.out.println(Thread.currentThread().getName()); }); thread.setDaemon(true); thread.start(); System.out.println(Thread.currentThread().getName()); } } 
Thread-0 main 

三、线程同步

3.1 锁

StampedLock 在某些场景下可以提供更好的并发性能,但也需要注意合理的使用,以避免过于复杂的代码结构和潜在的死锁情况。

synchronizedReentrantLock 都是基于悲观锁思想实现的,意味着它们假定在执行临界区代码期间会发生并发冲突。在高并发场景下,激烈的锁竞争可能会导致线程阻塞,从而降低性能。特别是在多读场景下,悲观锁可能引入大量的额外并发开销,因为每个读操作都需要获得独占锁。

相比之下,StampedLock乐观锁思想更适合多读场景。乐观锁假定数据操作不存在并发冲突,因此不会引起锁竞争,也不会导致线程阻塞和死锁。乐观锁通常在提交修改时才验证资源是否被其他线程修改。不过在多写场景下乐观锁会频繁失败和重试,这同样会对性能造成一定影响。

3.1.1 synchronized

synchronized 是 Java 中用于实现线程同步的关键字,它主要用于创建同步代码块同步方法,以确保在多线程环境下对共享资源的访问是安全的。通过使用 synchronized 可以避免多个线程同时访问共享资源而引发的并发问题,如竞态条件和数据不一致等。

在 Java 6 之前,synchronized 只有传统的重量级锁机制,直接通过对象内部的监视器(monitor)实现,存在性能上的瓶颈。在 Java 6 后,为了提高锁的获取与释放效率,JVM 引入了偏向锁轻量级锁两种新的锁机制。它们解决了在没有多线程竞争或基本没有竞争的场景下因使用传统锁机制带来的性能开销问题。不过这些锁机制由 JVM 根据实际情况进行自动选择,在实际编程中,我们通常无需显式地操作这些锁。一般情况下 JVM 会优先考虑偏向锁和轻量级锁,然后才会考虑是重量级锁。

偏向锁是指当一个线程第一次获取锁时,JVM 会在对象头和栈帧中的锁记录里存储锁偏向的线程 ID,以后这个线程再次获取该锁时,无需进行任何同步操作,直接获得锁,这样减少了无竞争情况下的锁操作开销。

轻量级锁采用乐观的 CAS 无锁竞争的方式获取锁,CAS 由 Unsafe 类中 compareAndSwap 方法实现,它会通过 JNI 调用 C++ 方法以内联汇编的形式完成相关操作,同时 compareAndSwap 方法还会通过检查引用和标志来避免 ABA 问题。

重量级锁依靠 monitor 机制实现,monitor 机制是操作系统提出来的一种高级原语,但属于语言范畴,由不同语言提供不同的实现。Java 中 monitor 机制的实现依赖于 ObjectMonitor,ObjectMonitor 是 JVM 内部基于 C++ 实现的一套机制,它为每个对象中都内置了一个 ObjectMonitor 对象,保证同一时刻只有一个线程能够获得指定对象的监视器(底层通过操作系统中的 mutex 实现)。因此一个对象可以作为 monitor object 被 synchronzied 关联,而访问同步方法或同步代码块的本质就是获取关联对象的监视器。


同步代码块

通过在代码块内使用 synchronized 关键字来创建同步代码块,它可以用来保护代码块,确保在同一时刻只有一个线程能够进入同步代码块。一个典型的用法是将需要同步的代码放在同步代码块中,并指定一个锁对象作为同步的依据。

虽然任意一个唯一的对象(比如一个字符串)都可以作为同步代码块的锁对象,但锁的粒度过大会导致并发安全问题,粒度过小会导致性能下降。类比同步方法,一般情况下,对于实例方法,通常使用 this 作为锁对象,对于静态方法,通常使用类的字节码对象 类名.class 作为锁对象。

同步代码块的基本使用示例:

package atreus.ink; public class Main { 
         private static int counter; private static final Object lock = new Object(); public static void main(String[] args) { 
         Runnable incrementTask = () -> { 
         for (int i = 0; i < 10000; i++) { 
         // 同步代码块 synchronized (lock) { 
         counter++; } } }; Thread thread1 = new Thread(incrementTask); Thread thread2 = new Thread(incrementTask); thread1.start(); thread2.start(); try { 
         thread1.join(); thread2.join(); } catch (InterruptedException e) { 
         e.printStackTrace(); } System.out.println("Final counter value: " + counter); } } 
Final counter value: 20000 

此外,在同步代码块中还可以通过 wait() 方法让当前线程进入等待状态,直到其他线程调用相同对象的 notify()notifyAll() 方法来唤醒它。这三个方法的实现也同样依赖于 monitor 机制,因此需要被绑定到指定的锁对象上。

wait()notify() 的基本使用示例:

package atreus.ink; public class Main { 
         public static void main(String[] args) { 
         final Object lock = new Object(); // 等待线程 Thread waiter = new Thread(() -> { 
         synchronized (lock) { 
         System.out.println("Waiter: Waiting for a notification..."); try { 
         lock.wait(); } catch (InterruptedException e) { 
         e.printStackTrace(); } System.out.println("Waiter: Got a notification!"); } }); // 通知线程 Thread notifier = new Thread(() -> { 
         synchronized (lock) { 
         System.out.println("Notifier: Performing some work..."); try { 
         Thread.sleep(2000); } catch (InterruptedException e) { 
         e.printStackTrace(); } System.out.println("Notifier: Work done, notifying the waiter..."); lock.notify(); } }); waiter.start(); notifier.start(); } } 
Waiter: Waiting for a notification... Notifier: Performing some work... Notifier: Work done, notifying the waiter... Waiter: Got a notification! 

同步方法

通过在方法定义处使用 synchronized 关键字来创建同步方法,它可以将整个方法体都变成一个同步代码块。同步方法底层通过隐式锁对象实现,只是锁的范围是整个方法代码。如果方法是实例方法,同步方法默认用 this 作为的锁对象。如果方法是静态方法,同步方法默认用 类名.class 作为的锁对象。

同步方法的优点是简单,可以很方便地实现线程同步。不过锁的范围较大,可能影响性能,因为其他不需要同步的代码也会被锁住。

同步方法的基本使用示例:

package atreus.ink; public class Main { 
          private static int counter; public static synchronized void increment() { 
          for (int i = 0; i < 10000; i++) { 
          counter++; } } public static void main(String[] args) { 
          Thread thread1 = new Thread(Main::increment); Thread thread2 = new Thread(Main::increment); thread1.start(); thread2.start(); try { 
          thread1.join(); thread2.join(); } catch (InterruptedException e) { 
          e.printStackTrace(); } System.out.println("Final counter value: " + counter); } } 
Final counter value: 20000 

3.1.2 ReentrantLock

ReentrantLock 是 Java 提供的一个可重入锁,默认为非公平锁,它相比于使用 synchronized 关键字具有更大的灵活性。通过 ReentrantLock,你可以显式地获取锁和释放锁,从而精确地控制同步范围。

ReentrantLock 提供了更多的功能,比如可重入性可定时的锁等待公平性设置等。但需要注意,使用 ReentrantLock 需要手动释放锁,因此务必在 finally 块中释放锁,以防止死锁情况的发生。

ReentrantLock 的基本使用示例:

package atreus.ink; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Main { 
           private static final Lock lock = new ReentrantLock(); private static int counter; public static void main(String[] args) { 
           Runnable incrementTask = () -> { 
           for (int i = 0; i < 10000; i++) { 
           lock.lock(); try { 
           counter++; } finally { 
           // 放在finally块中保证锁一定能被释放 lock.unlock(); } } }; Thread thread1 = new Thread(incrementTask); Thread thread2 = new Thread(incrementTask); thread1.start(); thread2.start(); try { 
           thread1.join(); thread2.join(); } catch (InterruptedException e) { 
           e.printStackTrace(); } System.out.println("Final counter value: " + counter); } } 
Final counter value: 20000 

3.1.3 StampedLock

StampedLock 是 Java 提供的一个支持乐观读、悲观读和写操作的锁机制。它在 Java 8 中引入,通过使用乐观读锁来提供更高的并发性,同时支持升级为悲观读锁或写锁。不过它不可重入不支持条件变量 Conditon

StampedLock 提供了三种读写控制模式:

  1. 乐观读锁:乐观读锁是一种无锁操作,它假设没有写操作会发生。线程可以直接读取数据而无需获取锁,读取完成后通过校验版本信息来判断数据是否有效。如果数据有效,操作成功;如果数据无效,需要尝试其他方式来获取锁。乐观读锁适用于读多写少的场景。
  2. 悲观读锁:悲观读锁是常规的读锁,它会阻塞写操作,但不会阻塞其他读操作。悲观读锁适用于读多写多的场景,可以保证读操作之间的数据一致性。
  3. 写锁:写锁会阻塞其他的读操作和写操作,用于保护共享资源的写操作。

StampedLock 的基本使用示例:

package atreus.ink; import java.util.concurrent.locks.StampedLock; public class Main { 
            private static final StampedLock lock = new StampedLock(); private static int counter; public static void main(String[] args) { 
            Runnable incrementTask = () -> { 
            for (int i = 0; i < 10000; i++) { 
            long stamp = lock.writeLock(); // 获取写锁 try { 
            counter++; } finally { 
            lock.unlockWrite(stamp); // 释放写锁 } } }; Thread thread1 = new Thread(incrementTask); Thread thread2 = new Thread(incrementTask); thread1.start(); thread2.start(); try { 
            thread1.join(); thread2.join(); } catch (InterruptedException e) { 
            e.printStackTrace(); } System.out.println("Final counter value: " + counter); } } 

3.2 原子变量

除了 StampedLock 中的乐观读锁,java.util.concurrent.atomic 包下面 AtomicIntegerAtomicLongAtomicIntegerArrayAtomicReference 等原子变量类也是基于乐观锁的思想实现的。

不过普通的 AtomicInteger 可能会存在 ABA 问题,此时可以使用 AtomicStampedReference,它内部除了一个对象引用,还维护了一个可以自动更新的整数,通过标识版本来避免 ABA 问题。

AtomicInteger 的基本使用示例:

package atreus.ink; import java.util.concurrent.atomic.AtomicInteger; public class Main { 
             private static final AtomicInteger counter = new AtomicInteger(0); public static void main(String[] args) { 
             Runnable incrementTask = () -> { 
             for (int i = 0; i < 10000; i++) { 
             counter.incrementAndGet(); } }; Thread thread1 = new Thread(incrementTask); Thread thread2 = new Thread(incrementTask); thread1.start(); thread2.start(); try { 
             thread1.join(); thread2.join(); } catch (InterruptedException e) { 
             e.printStackTrace(); } System.out.println("Final counter value: " + counter.get()); } } 

AtomicReference 的基本使用示例:

package atreus.ink; import java.util.concurrent.atomic.AtomicReference; public class Main { 
             private static final AtomicReference<Integer> counterRef = new AtomicReference<>(0); public static void main(String[] args) { 
             Runnable incrementTask = () -> { 
             for (int i = 0; i < 10000; i++) { 
             while (true) { 
             Integer current = counterRef.get(); Integer updated = current + 1; if (counterRef.compareAndSet(current, updated)) { 
             break; } } } }; Thread thread1 = new Thread(incrementTask); Thread thread2 = new Thread(incrementTask); thread1.start(); thread2.start(); try { 
             thread1.join(); thread2.join(); } catch (InterruptedException e) { 
             e.printStackTrace(); } System.out.println("Final counter value: " + counterRef.get()); } } 
Final counter value: 20000 

3.3 ThreadLocal

3.3.1 基本原理

ThreadLocal 主要用于实例需要在多个方法中共享,但不希望被多线程共享的场景中,它可以在不同的线程中存储不同的值,每个线程都操作自己的独立副本,互不干扰。

下面的例子中通过 ThreadLocal 每个线程都可以独立地访问和修改自己的变量:

public class ThreadLocalExample { 
              // 创建一个 ThreadLocal 变量 private static final ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0); public static void main(String[] args) { 
              // 启动多个线程,每个线程会有自己独立的 ThreadLocal 变量副本 for (int i = 0; i < 5; i++) { 
              new Thread(new Task()).start(); } } static class Task implements Runnable { 
              @Override public void run() { 
              // 获取当前线程的 ThreadLocal 变量值 Integer value = threadLocal.get(); System.out.println(Thread.currentThread().getName() + " initial value: " + value); // 修改当前线程的 ThreadLocal 变量值 value += 1; threadLocal.set(value); // 再次获取当前线程的 ThreadLocal 变量值 System.out.println(Thread.currentThread().getName() + " modified value: " + threadLocal.get()); } } } 

从实现上来讲,每一个 Thread 线程类都有一个类型为 ThreadLocal.ThreadLocalMap 的成员变量 threadLocals,即每个线程都有一个属于自己的 ThreadLocalMapThreadLocalMap 是一个自定义的哈希映射,其中每一个 Entry 都继承自 WeakReference<ThreadLocal<?>>,因此 key 对应 ThreadLocal弱引用value 对应与 ThreadLocal 对象相关联的对象。ThreadLocal 变量的读写本质上就是对 ThreadLocalMapEntry 的读写。

static class Entry extends WeakReference<ThreadLocal<?>> { 
              / The value associated with this ThreadLocal. */ Object value; Entry(ThreadLocal<?> k, Object v) { 
              super(k); value = v; } } 

ThreadLocalMap 通过弱引用探测式清理来避免可能发生的内存泄漏。

首先我们分析将 key 指定为虚引用的作用,下图为前面例子中 ThreadLocal 的内存分配,其中实线代表强引用,虚线代表弱引用。当我们在使用完 str 变量后,strThreadLocal 对象的强引用回收,此时 ThreadLocal 对象仅有一个弱引用,此时如果发生 GC,那么 ThreadLocal 对象的内存就能够被回收,因此避免了 ThreadLocal 对象的内存泄露。

在这里插入图片描述

不过,ThreadLocal 被回收后 ThreadLocalMap 中就会出现 keynullEntry,如果线程迟迟不能销毁,那么便会出现下图中的 ThreadA -> Thread 对象 -> ThreaLocalMap 对象 -> Entry -> value -> String 对象 强引用链,String 对象始终无法释放。为了避免这种情况,在调用 ThreadLocalgetset 以及 remove 方法时,会探测式清理 ThreadLocalMapkeynullvalue 对象的内存空间,从而避免了与 ThreadLocal 对象相关联的对象的内存泄露。

在这里插入图片描述

3.3.2 内存泄漏和脏数据

ThreadLocal 只有在使用完毕后没有及时 remove 且线程一直未被销毁(如线程池中的核心线程)的前提下才会发生内存泄露,任意一个条件不满足都不会发生内存泄漏。

当一个线程调用 ThreadLocalset 方法设置变量时,当前线程的 ThreadLocalMap 里就会存放一个 Entry,这个 EntrykeyThreadLocal 的弱引用,value 为保存的值对象(上图 String 对象)。

如果当前线程一直存在且没有调用 ThreadLocalremove 方法,并且这时候在其他地方还有对 ThreadLocal 的强引用,则当前线程的 ThreadLocalMap 变量里面会存在对 ThreadLocal 变量的弱引用和对 value 对象的强引用,它们是不会被释放的,这就会造成内存泄漏

如果强引用能够被回收,由于 ThreadLocalMap 里面的 key 是弱引用,因此 ThreadLocal 变量能在 GC 的时候回收。但是,只要探测式清理不能被触发,值对象的长期存活还是会造成内存泄漏

因此需要尽量避免在线程池核心线程等长久存活的线程中使用 ThreadLocal 变量,同时使用后及时 remove 释放资源。此外,在线程池中使用 ThreadLocal 也需要注意脏数据问题。

3.4 CountDownLatch

CountDownLatch 是一个同步辅助类,它允许一个或多个线程等待其他线程完成操作。CountDownLatch 通过一个计数器来实现一个线程等待其他线程完成某个操作,当计数器的值减为零时,等待的线程被释放。

CountDownLatch 主要提供了以下方法:

  • CountDownLatch(int count):构造方法,用于初始化计数器,指定计数器的初始值为 count
  • void await() throws InterruptedException:当调用线程调用此方法时,它会一直等待,直到计数器减为零。如果计数器不为零,线程将被阻塞。
    boolean await(long timeout, TimeUnit unit) throws InterruptedException:在指定的时间内等待计数器减为零,如果在指定时间内计数器未减至零,线程将被唤醒。
  • void countDown():每个被等待的线程执行完任务后,都应该调用此方法来减小计数器的值。

以下是一个 CountDownLatch 的简单的示例:

package atreus.ink; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Main { 
              public static void main(String[] args) throws InterruptedException { 
              ExecutorService es = Executors.newFixedThreadPool(10); CountDownLatch latch = new CountDownLatch(10); Runnable task = () -> { 
              System.out.println("task"); latch.countDown(); }; long begin = System.currentTimeMillis(); for (int i = 0; i < 10; i++) { 
              es.submit(task); } latch.await(); long end = System.currentTimeMillis(); System.out.println("time = " + (end - begin)); } } 
task task task task task task task task task task time = 1 

3.5 Future 和 CompletableFuture

Future 是一个接口,允许我们提交一个任务给线程池或其他异步执行机制,并且在未来获取该任务的结果。它通常用于处理耗时的操作,以便不阻塞主线程,从而提高程序的性能和响应性。

Future 接口相关方法:

  • get():用来获取异步任务的结果,如果任务尚未完成,调用 get() 将会阻塞当前线程,直到任务完成并返回结果。
  • isDone():用来检查异步任务是否已经完成,如果任务已经完成,它将返回 true,否则返回 false
  • cancel(boolean mayInterruptIfRunning):用于取消异步任务的执行,mayInterruptIfRunning 参数用于指定是否应该中断正在执行的任务。如果任务成功取消,get() 方法将会抛出 CancellationException

Future 接口通常通过 ExecutorService 接口的实现来使用,ExecutorService 提供了一种提交任务并获取 Future 的方式,从而管理线程池中的任务。

此外,Java 8 中还引入了 CompletableFuture 类,它实现了 Future 接口并提供了更丰富的功能,包括支持函数式编程、组合多个异步任务等。

Future 异步执行任务示例:

import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; class Solution { 
              public static void main(String[] args) { 
              ExecutorService executor = Executors.newFixedThreadPool(2); Future<Integer> future = executor.submit(() -> { 
              Thread.sleep(2000); // 模拟一个耗时操作 return 42; }); try { 
              System.out.println("等待任务完成..."); Integer result = future.get(); System.out.println("任务完成,结果为:" + result); } catch (InterruptedException | ExecutionException e) { 
              e.printStackTrace(); } executor.shutdown(); } } 

使用 CompletableFuture 分批处理数据:

Java 并发编程

使用 CompletableFuture 对多个异步任务进行编排:

import java.util.concurrent.CompletableFuture; public class Main { 
              / * 任务之间的传递关系 * cf1 -> cf3 * cf1 + cf2 -> cf4 * cf2 -> cf5 * cf3 + cf4 + cf5 -> cf6 */ public static void main(String[] args) { 
              CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "1"); CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> "2"); CompletableFuture<String> cf3 = cf1.thenApply((result1) -> result1 + "3"); CompletableFuture<String> cf4 = cf1.thenCombine(cf2, (result1, result2) -> result1 + result2 + "4"); CompletableFuture<String> cf5 = cf2.thenApply((result2) -> result2 + "5"); CompletableFuture<String> cf6 = CompletableFuture.allOf(cf3, cf4, cf5).thenApply((Void) -> { 
              // 由于 thenApply() 的缘故,这里的 join() 并不会阻塞,只是单纯获取结果 String result3 = cf3.join(); String result4 = cf4.join(); String result5 = cf5.join(); return result3 + result4 + result5 + "6"; }); String finalResult = cf6.join(); System.out.println(finalResult); } } 

3.5 volatile

volatile 关键字主要有两层语义:

  • 保证多线程环境下共享变量操作的可见性:参考 Java 内存模型(JMM)中先行发生(Happens-Before)原则对 volatile 变量规则的描述,对于一个 volatile 变量,如果对于这个变量的写操作先行发生于这个变量的读操作,那么这个写操作所产的影响对于后续的读操作是可见的。
  • 禁止指令重排序:编译器在编译时会在生成的字节码中插入特定的内存屏障指令,确保在 volatile 变量读写操作前后的代码不会被重排序。具体来说,会在 volatile 变量写操作之后,读操作之前插入屏障,因此执行到 volatile 变量读写操作时,前面的操作一定已经执行完成,后面的操作一定还未开始。

四、线程池

4.1 ThreadPoolExecutor

Java 中的线程池接口为 ExecutorService,一个常用的实现类为 ThreadPoolExecutor,其构造函数为:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 

参数说明

  • corePoolSize线程池的核心线程数,即任务队列未达到队列容量时,最大可以同时运行的线程数量。即使线程是空闲的,它们也不会被销毁,除非线程池被关闭。
  • maximumPoolSize线程池的最大线程数。在没有核心线程空闲的情况下,如果任务数量增加,线程池可以扩展到最大线程数。如果任务数量继续增加,超过线程池最大大小的任务将会被拒绝执行。
  • keepAliveTime非核心线程的最大空闲时间。当线程池中的线程数量超过 corePoolSize,多余的非核心线程会在空闲时间超过 keepAliveTime 后被销毁,以减少资源占用。
  • unit时间单位,用于指定 keepAliveTime 的时间单位。
  • workQueue用于存储等待执行的任务的阻塞队列。当所有核心线程都忙碌时,新任务将被放入队列等待执行。常用的队列类型包括 LinkedBlockingQueue(最大长度为 Integer.MAX_VALE,即无界队列)、ArrayBlockingQueue(有界队列)、PriorityBlockingQueue(基于堆的优先级队列)等。
  • threadFactory用于创建线程的工厂。可以通过提供自己实现的 ThreadFactory 自定义线程的创建过程。
  • handler拒绝策略,用于处理无法提交给线程池执行的任务。当任务数量超过线程池最大大小且队列已满时,将使用拒绝策略处理任务。

新任务拒绝策略

策略 详解
ThreadPoolExecutor.AbortPolicy 丢弃任务并抛出 RejectedExecutionException 异常,是默认的策略
ThreadPoolExecutor.DiscardPolicy 丢弃任务,但是不抛出异常,这是不推荐的做法
ThreadPoolExecutor.DiscardOldestPolicy 抛弃队列中等待最久的任务,然后把当前任务加入队列中
ThreadPoolExecutor.CallerRunsPolicy 由主线程负责调用任务的 run() 方法从而绕过线程池直接执行

注意事项

  • 新任务提交时发现核心线程都在忙任务队列也满了,并且还可以创建临时线程,此时才会创建临时线程
  • 核心线程和临时线程都在忙任务队列也满了,新的任务过来的时候才会开始拒绝任务

常用方法

方法名称 说明
void execute(Runnable command) 执行 Runnable 任务
Future<T> submit(Callable<T> task) 执行 Callable 任务,返回一个 Future 对象,用于获取线程返回的结果
void shutdown() 等全部任务执行完毕后,再关闭线程池
List<Runnable> shutdownNow() 立刻关闭线程池,停止正在执行的任务,并返回队列中未执行的任务

ThreadPoolExecutor 的基本使用示例:

public class MyRunnable implements Runnable { 
               @Override public void run() { 
               System.out.printf("[%s] %s\n", Thread.currentThread().getName(), LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); try { 
               Thread.sleep(3000); } catch (InterruptedException e) { 
               throw new RuntimeException(e); } } } 
public class Main { 
               public static void main(String[] args) { 
               ExecutorService pool = new ThreadPoolExecutor(2, 3, 8, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); Runnable target = new MyRunnable(); pool.execute(target); // 核心线程 pool.execute(target); // 核心线程 pool.execute(target); // 任务队列等待 pool.execute(target); // 任务队列等待 pool.execute(target); // 任务队列满,启动一个临时线程 pool.execute(target); // 核心线程和临时线程忙,同时任务队列已满,拒绝任务 pool.shutdown(); } } 
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task atreus.ink.MyRunnable@7a0ac6e3 rejected from java.util.concurrent.ThreadPoolExecutor@71be98f5[Running, pool size = 3, active threads = 3, queued tasks = 2, completed tasks = 0] at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055) at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825) at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355) at atreus.ink.Main.main(Main.java:20) [pool-1-thread-2] 2023-08-30 15:57:44 [pool-1-thread-1] 2023-08-30 15:57:44 [pool-1-thread-3] 2023-08-30 15:57:44 [pool-1-thread-1] 2023-08-30 15:57:47 [pool-1-thread-2] 2023-08-30 15:57:47 

使用 submit() 以捕获异常:

public class ThreadPoolExceptionHandling { 
               @SuppressWarnings("all") public static void main(String[] args) { 
               ExecutorService executorService = Executors.newFixedThreadPool(1); Future<Integer> future = executorService.submit(() -> { 
               throw new RuntimeException("Exception in task"); }); try { 
               // 调用 get() 方法获取任务执行结果,如果任务抛出了异常,这里会抛出 ExecutionException Integer result = future.get(); System.out.println("Task result: " + result); } catch (InterruptedException | ExecutionException e) { 
               // 处理任务执行中抛出的异常 e.printStackTrace(); } finally { 
               executorService.shutdown(); } } } 
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Exception in task at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at ThreadPoolExceptionHandling.main(ThreadPoolExceptionHandling.java:16) Caused by: java.lang.RuntimeException: Exception in task at ThreadPoolExceptionHandling.lambda$main$0(ThreadPoolExceptionHandling.java:10) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) 

4.2 Executors

Executors 是一个线程池的工具类,提供了很多静态方法用于返回不同特点的线程池对象。

方法名称 说明
public static ExecutorService newFixedThreadPool(int nThreads) 创建固定线程数量的线程池,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程替代它
public static ExecutorService newSingleThreadExecutor() 创建只有一个线程的线程池对象,如果该线程出现异常而结束,那么线程池会补充一个新线程
public static ExecutorService newCachedThreadPool() 线程数量随着任务增加而增加,如果线程任务执行完毕且空闲了 60s 则会被回收掉。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) 创建一个线程池,可以实现在给定的延迟后运行任务或者定期执行任务

newScheduledThreadPool 的基本使用示例:

package atreus.ink; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; public class MyRunnable implements Runnable { 
                @Override public void run() { 
                System.out.printf("[%s] %s\n", Thread.currentThread().getName(), LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); } } 
package atreus.ink; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class Main { 
                public static void main(String[] args) throws InterruptedException { 
                ScheduledExecutorService pool = Executors.newScheduledThreadPool(2); Runnable target = new MyRunnable(); // 延迟1秒后执行target任务 pool.schedule(target, 1, TimeUnit.SECONDS); // 延迟2秒后,每隔3秒执行一次target任务 pool.scheduleAtFixedRate(target, 2, 3, TimeUnit.SECONDS); Thread.sleep(10 * 1000); pool.shutdown(); } } 
[pool-1-thread-1] 2023-08-30 16:26:33 [pool-1-thread-2] 2023-08-30 16:26:34 [pool-1-thread-2] 2023-08-30 16:26:37 [pool-1-thread-2] 2023-08-30 16:26:40 

五、线程状态

在这里插入图片描述

java.lang.Thread.State 中定义了六种线程状态,可以通过 getState() 方法获取当前线程的状态。

线程状态 说明
NEW 通过 new 关键字新建一个线程,但还未调用 start() 方法
RUNNABLE 调用 start() 后等待调度(就绪)、正在运行
BLOCKED 等待 synchronized 监视器锁时,陷入阻塞状态
WAITING 等待其他线程执行特定的操作
TIMED_WAITING 具有指定等待时间的等待状态
TERMINATED 线程完成执行,变为终止状态

sleep()yield() 的区别:

  • sleep()强制线程进入超时等待状态,时间到了之后才会转入就绪状态,是一种相对确定的暂停方式。而 yield() 方法会提示当前线程进入就绪状态,只是一种提示性的暂停,有可能被操作系统忽略。
  • 使用 sleep() 方法需要处理中断异常,而 yield() 不用。

BLOCKED 和 WAITING 均属于线程的阻塞等待状态,区别如下:

  • BLOCKED 是 synchronized 锁竞争失败后被动触发的状态,WAITING 是人为主动触发的状态。
  • BLOCKED 的唤醒时自动触发的,而 WAITING 状态是必须要通过特定的方法来主动唤醒,比如 Object.notify() 方法可以唤醒 Object.wait() 方法阻塞的线程,LockSupport.unpark() 可以唤醒 LockSupport.park() 方法阻塞的线程。

参考:

在这里插入图片描述

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/110600.html

(0)
上一篇 2026-01-30 15:15
下一篇 2026-01-30 15:26

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信