并发包概述

提前话:

进行myCat摸索时到处可见的并发包的原子性数据操作,与其摸瞎子过河,不如先把瞎子摸一遍!!!🤔 于是便有了下面这片的技术博客(关于concurrent概述)🤨

Overview of the java.util.concurrent(概述)

原文网址

Main Components(主要构成)

The java.util.concurrent contains way too many features to discuss in a single write-up. In this article, we will mainly focus on some of the most useful utilities from this package like:(在这片文章中如果一次撰写或讨论concurrent的功能、方式会太多,所以我们将大部分的精力集中于最常使用的实用程序)

  • Executor(执行者)
  • ExecutorService(执行器服务)
  • ScheduledExecutorService(执行器服务时间表)
  • Future()
  • CountDownLatch(倒计时)
  • CyclicBarrier(循环屏障??)
  • Semaphore
  • ThreadFactory
  • BlockingQueue
  • DelayQueue
  • Locks
  • Phaser

You can also find many dedicated articles to individual classes here.

Executor(执行器)

Executor\ is an interface that represents an object that executes provided tasks.

(Executor 是一个代表执行提供任务的一个接口对象)

It depends on the particular implementation (from where the invocation is initiated) if the task should be

它依赖于一个特定的实现(从启动调用的位置)如果这个任务需要运行在一个新线程或者当前线程

run on a new or current thread. Hence, using this interface, we can decouple the task execution flow from

因此,使用这个接口 我们可以将任务执行流与实际任务执行机制进行解耦(decouple)。

the actual task execution mechanism.

One point to note here is that Executor does not strictly require the task execution to be asynchronous.

这里需要注意的一点是,执行器并不严格要求任务执行是异步的。

In the simplest case, an executor can invoke the submitted task instantly in the invoking thread.

在这个简单的例子上, 执行者可以在调用线程中立即(instantly 即刻)调用提交的任务。

We need to create an invoker to create the executor instance:

我们需要生成一个调用者去生成一个执行者实例

Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class MyExecutor {
public static void main(String[] args) {
System.out.println("Begin execute");
execute();
}

public static void execute() {
Executor executor = new Invoker();
executor.execute(
() -> { // task to be performed 准备执行任务
System.out.println("Hello Executor");
});
}
}

class Invoker implements Executor {
// We need to create an invoker to create the executor instance:
// 我们需要创建一个调用程序来创建执行程序实例:
@Override
public void execute(Runnable runnable) {
runnable.run();
}
}

看到上述的案例,想必你想问,那我直接print(“Hello Executor”)不行吗???(其实可以作为executor的生成案例) 我也正疑惑呢 so 得找个好案例

Executor

ExecutorService(执行器服务)

ExecutorService is a complete solution for asynchronous processing.(执行器服务是异步进程的一个完整解决方案) It manages an in-memory queue and schedules submitted tasks based on thread availability.

它管理着内存中的队列(什么队列???🤔 🤨)并根据线程可用性安排已提交的任务。

To use ExecutorService, we need to create one Runnable class.

Java

1
2
3
4
5
6
public class Task implements Runnable {
@Override
public void run() {
// task details
}
}

Now we can create the ExecutorService instance and assign this task. At the time of creation,

下面我们开始生成一个ExecutorService 实例并且分配一些任务,在生成的时候,我们需要指定线程池的大小

we need to specify the thread-pool size.

Java

1
ExecutorService executor = Executors.newFixedThreadPool(10);

If we want to create a single-threaded ExecutorService instance, we can use

如果我们想创建一个单线程的执行器服务实例,我们可以使用()去生成一个实例

newSingleThreadExecutor(ThreadFactory threadFactory)\ to create the instance.

Once the executor is created, we can use it to submit the task.

一旦这个执行者被生成,我们就可以使用它去提交任务

Java

1
2
3
public void execute() { 
executor.submit(new Task());
}

We can also create the Runnable instance while submitting the task.

我们还可以在提交任务时创建Runnable实例。

Java

1
2
3
executor.submit(() -> {
new Task();
});

It also comes with two out-of-the-box execution termination methods.

它也有两种现成(out of the box)的执行终止方法

The first one is shutdown(); it waits until all the submitted tasks finish executing.

等待所有的提交任务执行完毕

The other method is shutdownNow() which immediately terminates all the pending/executing tasks.

该方法立即终止(terminates)所有待处理/正在执行的任务。

There is also another method awaitTermination(long timeout, TimeUnit unit)

which forcefully blocks until all tasks have completed execution after a shutdown event triggered

or execution-timeout occurred,

在触发关闭事件或发生执行超时后或者这个执行线程本身中断,它会强制阻塞直到所有任务都已完成执行,

or the execution thread itself is interrupted(中断),

Java

1
2
3
4
5
try {
executor.awaitTermination( 20l, TimeUnit.NANOSECONDS );
} catch (InterruptedException e) {
e.printStackTrace();
}

这个executorService还是有点懵逼🤬

ScheduledExecutorService(定时调度机制)

ScheduledExecutorService is a similar interface to ExecutorService, but it can perform tasks periodically.

xxx是类似于执行器服务的一个接口,但是它可定期执行任务,So you know that

Executor and ExecutorService\‘s methods are scheduled on the spot without introducing any artificial delay.

XXX的方法是立即运行 没有引入任何人为的延迟

Zero or any negative value signifies that the request needs to be executed instantly.

零和任何负值都表示这个请求需要立即执行

We can use both Runnable and Callable interface to define the task.

define 定义

Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void execute() {
ScheduledExecutorService executorService
= Executors.newSingleThreadScheduledExecutor();

Future<String> future = executorService.schedule(() -> {
// ...
return "Hello world";
}, 1, TimeUnit.SECONDS);

ScheduledFuture<?> scheduledFuture = executorService.schedule(() -> {
// ...
}, 1, TimeUnit.SECONDS);

executorService.shutdown();
}

ScheduledExecutorService can also schedule the task after some given fixed delay:

也可预定任务在一定延后

Java

1
2
3
4
5
6
7
executorService.scheduleAtFixedRate(() -> {
// ...
}, 1, 10, TimeUnit.SECONDS);

executorService.scheduleWithFixedDelay(() -> {
// ...
}, 1, 10, TimeUnit.SECONDS);

Here, the scheduleAtFixedRate( Runnable command, long initialDelay(初始延迟), long period(时间), TimeUnit unit )\ method creates and executes a periodic action that is invoked firstly after the provided initial delay, and subsequently with the given period until the service instance shutdowns.

创建并执行一个周期性操作,该操作将在提供的初始延迟后首先调用,然后以给定的周期调用,直到服务实例关闭为止。

The scheduleWithFixedDelay( Runnable command, long initialDelay, long delay, TimeUnit unit )\ method creates and executes a periodic action that is invoked firstly after the provided initial delay, and repeatedly with the given delay between the termination of the executing one and the invocation of the next one.

创建并执行一个周期性操作,该周期性操作会在提供的初始延迟后首先调用,并在执行的终止与下一个调用的调用之间以给定的延迟重复进行。

Future()

Future\ is used to represent the result of an asynchronous operation.

用于表示异步操作的结果

It comes with methods for checking if the asynchronous operation is completed or not, getting the computed result, etc.

它包含检查异步操作是否完成,获取计算结果等的方法。

What’s more, the cancel(boolean mayInterruptIfRunning) API cancels the operation and releases the executing thread.

此外XXX API会取消操作并释放执行线程

If the value of mayInterruptIfRunning is true, the thread executing the task will be terminated instantly.

如果mayInterruptIfRunning的值为true,则执行任务的线程将立即终止。

Otherwise, in-progress tasks will be allowed to complete.

否则,将允许正在进行的任务完成。

We can use below code snippet to create a future instance:

Java

1
2
3
4
5
6
7
8
9
public void invoke() {
ExecutorService executorService = Executors.newFixedThreadPool(10);

Future<String> future = executorService.submit(() -> {
// ...
Thread.sleep(10000l);
return "Hello world";
});
}

We can use following code snippet to check if the future result is ready and fetch the data if the computation is done:

Java

1
2
3
4
5
6
7
if (future.isDone() && !future.isCancelled()) {
try {
str = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}

We can also specify a timeout for a given operation. If the task takes more than this time, a TimeoutException is thrown:

Java

1
2
3
4
5
try {
future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}

CountDownLatch(闭锁)

CountDownLatch (introduced in JDK 5) is a utility class which blocks a set of threads until some operation completes.

一个实用程序类,它将阻止一组线程,直到完成某些操作为止。

CountDownLatch用一个计数器(整数类型)初始化;随着从属线程完成执行,此计数器递减。

A CountDownLatch is initialized with a counter(Integer type);

用一个计数器初始化

this counter decrements as the dependent threads complete execution.

这个计数器随着从属线程完成执行而递减

But once the counter reaches zero, other threads get released.

但是一旦计数器达到零,其他线程就会被释放。

You can learn more about CountDownLatch here.

您可以在此处了解有关CountDownLatch的更多信息。

CyclicBarrier栅栏

CyclicBarrier works almost the same as CountDownLatch except that we can reuse it.

XX的功能几乎和XXX相同,除了我们可以重复使用他

Unlike CountDownLatch, it allows multiple threads to wait for each other using await() method(known as barrier condition) before invoking the final task.

不像XXX 在调用最终的任务之前,它允许多个线程使用await()方法互相等待(称为屏障条件)

栅栏类似于闭锁,它能阻塞一组线程直到某个事件的发生。栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。

We need to create a Runnable task instance to initiate the barrier condition:

启动屏障时我们需要创建一个XXX任务实例

Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Task implements Runnable {

private CyclicBarrier barrier;

public Task(CyclicBarrier barrier) {
this.barrier = barrier;
}

@Override
public void run() {
try {
LOG.info(Thread.currentThread().getName() +
" is waiting");
barrier.await();
LOG.info(Thread.currentThread().getName() +
" is released");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}

}

Now we can invoke some threads to race for the barrier condition:

现在我们可以调用一些线程来竞争屏障条件:

Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void start() {

CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
// ...
LOG.info("All previous tasks are completed");
});

Thread t1 = new Thread(new Task(cyclicBarrier), "T1");
Thread t2 = new Thread(new Task(cyclicBarrier), "T2");
Thread t3 = new Thread(new Task(cyclicBarrier), "T3");

if (!cyclicBarrier.isBroken()) {
t1.start();
t2.start();
t3.start();
}
}

Here, the isBroken() method checks if any of the threads got interrupted during the execution time. We should always perform this check before performing the actual process.

isBroken()方法检查在执行期间是否有任何线程被中断。在执行实际过程之前,我们应始终执行此检查。

Semaphore(多元信号量)

tips:Binary Semaphore(二元信号量) 是最简单的一种锁

The Semaphore is used for blocking thread level access to some part of the physical or logical resource. A semaphore contains a set of permits; whenever a thread tries to enter the critical section, it needs to check the semaphore if a permit is available or not.

信号量用于阻止线程级别访问物理或逻辑资源的某些部分。信号量包含了一组许可证;每当线程尝试进入关键部分时,它都需要检查信号量,以查看是否有许可。

If a permit is not available (via *tryAcquire()*), the thread is not allowed to jump into the critical section; however, if the permit is available the access is granted, and the permit counter decreases.

如果没有许可证(通过tryAcquire()),则不允许该线程跳转到关键部分;但是,如果许可证可用,则授予访问权限,并且许可证计数器减少。

Once the executing thread releases the critical section, again the permit counter increases (done by release() method).

一旦执行线程释放了关键部分,许可计数器就会再次增加(由release()方法完成)

We can specify a timeout for acquiring access by using the tryAcquire(long timeout, TimeUnit unit) method.

我们可以使用tryAcquire(long timeout,TimeUnit unit)方法去指定用于获取访问权限的超时。

We can also check the number of available permits or the number of threads waiting to acquire the semaphore.

我们还可以检查可用许可数或等待获取信号量的线程数。

ex: Semaphore是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做完自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。Semaphore可以用来构建一些对象池,资源池之类的,比如数据库连接池,我们也可以创建计数为1的Semaphore,将其作为一种类似互斥锁的机制,这也叫二元信号量,表示两种互斥状态

Following code snippet can be used to implement a semaphore:

下面的代码块可以用来实现或继承semaphore

Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static Semaphore semaphore = new Semaphore(10);

public void execute() throws InterruptedException {

LOG.info("Available permit : " + semaphore.availablePermits());
LOG.info("Number of threads waiting to acquire: " +
semaphore.getQueueLength());

if (semaphore.tryAcquire()) {
try {
// ...
}
finally {
semaphore.release();
}
}

}

We can implement a Mutex like data-structure using Semaphore. More details on this can be found here.我们可以使用信号量实现类似Mutex的数据结构

ThreadFactory(线程工厂)

As the name suggests(顾名思义), ThreadFactory acts as a thread (non-existing) pool which creates a new thread on demand.

顾名思义,ThreadFactory充当线程(不存在)的池,可根据需要创建新线程。

It eliminates the need of a lot of boilerplate coding for implementing efficient thread creation mechanisms.

它消除了用于实现有效的线程创建机制的大量样板代码的需求。

We can define(定义) a ThreadFactory:

Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class BaeldungThreadFactory implements ThreadFactory {
private int threadId;
private String name;

public BaeldungThreadFactory(String name) {
threadId = 1;
this.name = name;
}

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, name + "-Thread_" + threadId);
LOG.info("created new thread with id : " + threadId +
" and name : " + t.getName());
threadId++;
return t;
}
}

We can use this newThread(Runnable r) method to create a new thread at runtime:

在程序运行时我们可以使用XXX去创建一个新的线程

Java

1
2
3
4
5
6
BaeldungThreadFactory factory = new BaeldungThreadFactory( 
"BaeldungThreadFactory");
for (int i = 0; i < 10; i++) {
Thread t = factory.newThread(new Task());
t.start();
}

BlockingQueue(阻塞队列)

In asynchronous programming, one of the most common integration patterns is the producer-consumer pattern.

The java.util.concurrent package comes with a data-structure know as BlockingQueue – which can be very useful in these async scenarios.

More information and a working example on this is available here.

在异步编程中,最常见的集成模式之一是生产者-消费者模式。

java.util.concurrent包附带一个称为BlockingQueue的数据结构–在这些异步情况下非常有用。 此处提供了更多信息和工作示例。

DelayQueue

DelayQueue is an infinite-size blocking queue of elements where an element can only be pulled if it’s expiration time (known as user defined delay) is completed. Hence, the topmost element (head) will have the most amount delay and it will be polled last.

More information and a working example on this is available here.

DelayQueue是元素的无限大小的阻塞队列,在该队列中,只有在元素的到期时间(称为用户定义的延迟)完成后才能将其拉出。因此,最上面的元素(头)将具有最大的延迟量,并且将在最后轮询。

Locks

Not surprisingly, Lock is a utility for blocking other threads from accessing a certain segment of code, apart from the thread that’s executing it currently.

The main difference between a Lock and a Synchronized block is that synchronized block is fully contained in a method; however, we can have Lock API’s lock() and unlock() operation in separate methods.

More information and a working example on this is available here.

毫不奇怪,Lock是一种实用程序,用于阻止其他线程访问代码的特定部分(当前正在执行该代码的线程除外)。 Lock和Synchronized块之间的主要区别在于,同步块完全包含在方法中。但是,我们可以在单独的方法中使用Lock API的lock()和unlock()操作。

Phaser

Phaser is a more flexible solution than CyclicBarrier and CountDownLatch – used to act as a reusable barrier on which the dynamic number of threads need to wait before continuing execution. We can coordinate multiple phases of execution, reusing a Phaser instance for each program phase.

More information and a working example on this is available here.

与CyclicBarrier和CountDownLatch相比,Phaser是一种更加灵活的解决方案– CyclicBarrier和CountDownLatch用作可重复使用的屏障,动态数量的线程需要等待该屏障才能继续执行。我们可以协调执行的多个阶段,为每个程序阶段重用一个Phaser实例。

Conclusion(结论)

In this high-level, overview article, we’ve focused on the different utilities available of java.util.concurrent package.

As always, the full source code is available over on GitHub.