Spring框架中的線程池
原文合集地址如下,有需要的朋友可以關(guān)注
[本文地址](https://mp.weixin.qq.com/s/FCeSlVNImbxKU6Be_YJA1A)
[合集地址](https://mp.weixin.qq.com/mp/appmsgalbum?__biz=MzI5MjY4OTQ2Nw==&action=getalbum&album_id=2970034039967449094&scene=173&from_msgid=2247484208&from_itemidx=1&count=3&nolastread=1#wechat_redirect)
# Spring框架中的線程池
## 使用Java的ExecutorService接口實(shí)現(xiàn)
`ExecutorService`是Java提供的用于管理線程池的高級(jí)工具。
下面是在Spring框架中使用線程池的一般步驟:
### 導(dǎo)入所需的依賴
首先,確保你的項(xiàng)目中包含了使用線程池所需的依賴。通常情況下,你可以使用Spring Boot來創(chuàng)建項(xiàng)目,它會(huì)自動(dòng)包含線程池相關(guān)的依賴。
### 創(chuàng)建線程池
在Spring中,你可以通過配置文件或使用Java代碼來創(chuàng)建線程池。如果你使用配置文件,可以在`application.properties`或`application.yml`文件中配置線程池的屬性。如果你使用Java代碼,可以通過創(chuàng)建`ThreadPoolTaskExecutor`或`ThreadPoolExecutor`對(duì)象來實(shí)現(xiàn)。
? ?```java
? ?// 通過Java代碼創(chuàng)建線程池
? ?@Bean
? ?public ExecutorService taskExecutor() {
? ? ? ?ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
? ? ? ?executor.setCorePoolSize(10); // 設(shè)置核心線程數(shù)
? ? ? ?executor.setMaxPoolSize(20); // 設(shè)置最大線程數(shù)
? ? ? ?executor.setQueueCapacity(30); // 設(shè)置隊(duì)列容量
? ? ? ?executor.setThreadNamePrefix("MyThread-"); // 設(shè)置線程名前綴
? ? ? ?executor.initialize();
? ? ? ?return executor;
? ?}
? ?```
### 在需要使用線程池的地方注入它
在你的代碼中,如果你需要使用線程池來執(zhí)行異步任務(wù)或并發(fā)處理,你可以通過在需要使用的地方注入線程池來實(shí)現(xiàn)。
? ?```java
? ?@Autowired
? ?private ExecutorService taskExecutor;
? ?```
### 提交任務(wù)給線程池
一旦你注入了線程池,你就可以使用它來提交任務(wù)。
? ?```java
? ?taskExecutor.execute(() -> {
? ? ? ?// 在這里編寫你的任務(wù)邏輯
? ?});
? ?```
? ?或者,如果你需要獲取任務(wù)的結(jié)果,你可以使用`submit()`方法。
? ?```java
? ?Future<Result> future = taskExecutor.submit(() -> {
? ? ? ?// 在這里編寫你的任務(wù)邏輯,并返回一個(gè)結(jié)果
? ? ? ?return result;
? ?});
? ?```
### 銷毀線程池
在Spring應(yīng)用程序關(guān)閉時(shí),確保銷毀線程池以釋放資源。
? ?```java
? ?@PreDestroy
? ?public void shutdown() {
? ? ? ?if (taskExecutor instanceof ExecutorService) {
? ? ? ? ? ?((ExecutorService) taskExecutor).shutdown();
? ? ? ?}
? ?}
? ?```
以上是在Spring框架中使用線程池的基本步驟。你可以根據(jù)自己的需求和具體的場(chǎng)景來配置線程池的屬性和使用方式。
## 使用@Async注解使用異步線程池執(zhí)行任務(wù)
在Spring框架中,使用`@Async`注解可以將方法標(biāo)記為異步執(zhí)行的方法,使其在調(diào)用時(shí)會(huì)被自動(dòng)提交到線程池中執(zhí)行,而不會(huì)阻塞主線程。
以下是使用`@Async`注解的步驟:
### 導(dǎo)入所需的依賴
確保你的項(xiàng)目中包含了使用`@Async`注解所需的依賴。通常情況下,你可以使用Spring Boot來創(chuàng)建項(xiàng)目,它會(huì)自動(dòng)包含相關(guān)的依賴。
### 啟用異步支持
在Spring配置中,你需要啟用異步支持。如果你使用Java配置方式,可以在配置類上加上`@EnableAsync`注解。如果你使用XML配置方式,可以在XML配置文件中添加`<task:annotation-driven executor="taskExecutor" />`。
### 在需要異步執(zhí)行的方法上添加`@Async`注解
將`@Async`注解添加到你希望異步執(zhí)行的方法上。
? ?```java
? ?@Async
? ?public void asyncMethod() {
? ? ? ?// 異步執(zhí)行的方法邏輯
? ?}
? ?```
### 配置線程池并指定任務(wù)執(zhí)行器名稱(可選)
Spring框架默認(rèn)使用`SimpleAsyncTaskExecutor`作為任務(wù)執(zhí)行器,但你也可以自定義線程池并指定任務(wù)執(zhí)行器的名稱。在配置類中創(chuàng)建一個(gè)`TaskExecutor`的bean,并通過`@Async`注解的`value`屬性或`Executor`參數(shù)指定任務(wù)執(zhí)行器的名稱。
? ?```java
? ?@Bean("taskExecutor")
? ?public Executor taskExecutor() {
? ? ? ?ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
? ? ? ?executor.setCorePoolSize(10);
? ? ? ?executor.setMaxPoolSize(20);
? ? ? ?executor.setQueueCapacity(30);
? ? ? ?executor.setThreadNamePrefix("MyThread-");
? ? ? ?executor.initialize();
? ? ? ?return executor;
? ?}
? ?// 使用@Async注解并指定任務(wù)執(zhí)行器名稱
? ?@Async("taskExecutor")
? ?public void asyncMethodWithCustomExecutor() {
? ? ? ?// 異步執(zhí)行的方法邏輯
? ?}
? ?```
#### 關(guān)于SimpleAsyncTaskExecutor
`SimpleAsyncTaskExecutor`是Spring框架提供的默認(rèn)的任務(wù)執(zhí)行器,它是基于Java的`Executor`接口實(shí)現(xiàn)的簡(jiǎn)單的線程池。
要配置`SimpleAsyncTaskExecutor`,你可以在Spring的配置類中創(chuàng)建一個(gè)`TaskExecutor`的bean,并將其返回。以下是一個(gè)示例:
```java
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
@Configuration
public class AppConfig {
? ? @Bean
? ? public TaskExecutor taskExecutor() {
? ? ? ? SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
? ? ? ? executor.setConcurrencyLimit(10); // 設(shè)置并發(fā)限制,默認(rèn)為無限制
? ? ? ? executor.setThreadNamePrefix("MyThread-"); // 設(shè)置線程名前綴
? ? ? ? return executor;
? ? }
}
```
在上述示例中,我們創(chuàng)建了一個(gè)`TaskExecutor`的bean,并將其返回。`SimpleAsyncTaskExecutor`的實(shí)例被創(chuàng)建,并通過`setConcurrencyLimit()`方法設(shè)置了并發(fā)限制,這是可選的,默認(rèn)為無限制。`setThreadNamePrefix()`方法設(shè)置了線程名的前綴,以便于識(shí)別任務(wù)執(zhí)行的線程。
你可以根據(jù)需要進(jìn)行其他配置,`SimpleAsyncTaskExecutor`還提供了其他方法,例如`setThreadGroupName()`、`setDaemon()`等,可以根據(jù)具體需求進(jìn)行設(shè)置。
在使用`SimpleAsyncTaskExecutor`時(shí),它會(huì)根據(jù)需要?jiǎng)?chuàng)建新的線程來執(zhí)行任務(wù),不會(huì)重用線程。因此,它適用于短期、簡(jiǎn)單的異步任務(wù),但對(duì)于長(zhǎng)期運(yùn)行的任務(wù)或需要復(fù)雜線程池配置的場(chǎng)景,可能需要使用其他實(shí)現(xiàn),如`ThreadPoolTaskExecutor`。
請(qǐng)注意,`SimpleAsyncTaskExecutor`不適合高負(fù)載的應(yīng)用程序,因?yàn)樗鼪]有線程池的管理機(jī)制,并且沒有對(duì)隊(duì)列容量、拒絕策略等進(jìn)行配置。如果你的應(yīng)用程序需要更高級(jí)的線程池管理功能,建議使用`ThreadPoolTaskExecutor`或其他支持更多配置選項(xiàng)的任務(wù)執(zhí)行器。
### 關(guān)于ThreadPoolTaskExecutor
`ThreadPoolTaskExecutor`是Spring框架提供的一個(gè)基于Java的`ThreadPoolExecutor`的封裝,它提供了更多的線程池管理功能和配置選項(xiàng)。
要配置`ThreadPoolTaskExecutor`,你可以在Spring的配置類中創(chuàng)建一個(gè)`TaskExecutor`的bean,并進(jìn)行相應(yīng)的配置。以下是一個(gè)示例:
```java
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.core.task.TaskExecutor;
@Configuration
public class AppConfig {
? ? @Bean
? ? public TaskExecutor taskExecutor() {
? ? ? ? ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
? ? ? ? executor.setCorePoolSize(10); // 設(shè)置核心線程數(shù)
? ? ? ? executor.setMaxPoolSize(20); // 設(shè)置最大線程數(shù)
? ? ? ? executor.setQueueCapacity(30); // 設(shè)置隊(duì)列容量
? ? ? ? executor.setThreadNamePrefix("MyThread-"); // 設(shè)置線程名前綴
? ? ? ? executor.initialize();
? ? ? ? return executor;
? ? }
}
```
在上述示例中,我們創(chuàng)建了一個(gè)`TaskExecutor`的bean,并將其返回。`ThreadPoolTaskExecutor`的實(shí)例被創(chuàng)建,并通過一系列的`set`方法進(jìn)行配置,包括:
- `setCorePoolSize(int corePoolSize)`: 設(shè)置核心線程數(shù),即線程池中保持的線程數(shù)量。
- `setMaxPoolSize(int maxPoolSize)`: 設(shè)置最大線程數(shù),即線程池允許的最大線程數(shù)量。
- `setQueueCapacity(int queueCapacity)`: 設(shè)置隊(duì)列容量,即線程池任務(wù)隊(duì)列的最大容量。
- `setThreadNamePrefix(String threadNamePrefix)`: 設(shè)置線程名的前綴,以便于識(shí)別任務(wù)執(zhí)行的線程。
- `initialize()`: 初始化線程池。
除了上述方法,`ThreadPoolTaskExecutor`還提供了其他配置選項(xiàng),如`setKeepAliveSeconds()`、`setAllowCoreThreadTimeOut()`、`setRejectedExecutionHandler()`等,可以根據(jù)具體需求進(jìn)行設(shè)置。
配置完成后,你可以將`ThreadPoolTaskExecutor`用作任務(wù)執(zhí)行器,通過在`@Async`注解中指定任務(wù)執(zhí)行器的名稱或通過注入使用它。
請(qǐng)注意,`ThreadPoolTaskExecutor`提供了線程池的管理功能,可以根據(jù)任務(wù)的情況自動(dòng)調(diào)整線程池的大小,對(duì)于長(zhǎng)期運(yùn)行的異步任務(wù)或需要更高級(jí)的線程池管理的場(chǎng)景,它是更常用的選擇。
### 調(diào)用異步方法
在其他組件或類中,直接調(diào)用帶有`@Async`注解的異步方法即可。
? ?```java
? ?myService.asyncMethod();
? ?```
`@Async`注解可以在后面加入一個(gè)名字,用于指定具體的任務(wù)執(zhí)行器,如上面的示例中的`@Async("taskExecutor")`。這樣做的好處是,你可以針對(duì)不同的異步任務(wù)使用不同的線程池或任務(wù)執(zhí)行器。
注意事項(xiàng):
- 異步方法必須定義在Spring管理的Bean中,因?yàn)镾pring會(huì)通過代理來攔截方法調(diào)用并將其提交給線程池。
- 使用`@Async`注解時(shí),如果異步方法內(nèi)部發(fā)生了異常,調(diào)用者不會(huì)收到異常,因?yàn)楫惒椒椒ㄔ讵?dú)立的線程中執(zhí)行。若需要捕獲異常,可以使用`Future`來獲取異步任務(wù)的執(zhí)行結(jié)果并處理異常。
- 確保在Spring的配置中啟用了異步支持,否則`@Async`注解將不起作用。
這些是使用`@Async`注解實(shí)現(xiàn)異步方法的基本步驟,可以根據(jù)具體需求進(jìn)行配置和使用。
## @Async的實(shí)現(xiàn)原理
@Async注解的實(shí)現(xiàn)原理使用了AOP、動(dòng)態(tài)代理、Java反射、線程池、Runnable和Callable接口、Future接口、Java并發(fā)類庫(kù)等技術(shù)和類庫(kù),通過這些技術(shù)和類庫(kù)的組合,實(shí)現(xiàn)了異步方法的調(diào)用和執(zhí)行。
### 異步方法的代理生成
當(dāng)Spring掃描到帶有@Async注解的方法時(shí),它會(huì)生成一個(gè)代理對(duì)象來封裝該方法。
以下是Spring框架中生成代理對(duì)象的代碼示例:
```java
public class AsyncAnnotationBeanPostProcessor implements BeanPostProcessor, Ordered {
? ? // ...
? ? @Override
? ? public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
? ? ? ? Class<?> targetClass = AopUtils.getTargetClass(bean);
? ? ? ? for (Method method : targetClass.getMethods()) {
? ? ? ? ? ? Async asyncAnnotation = AnnotationUtils.findAnnotation(method, Async.class);
? ? ? ? ? ? if (asyncAnnotation != null) {
? ? ? ? ? ? ? ? // 創(chuàng)建代理對(duì)象
? ? ? ? ? ? ? ? Object proxy = createAsyncProxy(bean, beanName, method);
? ? ? ? ? ? ? ? return proxy;
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? return bean;
? ? }
? ? private Object createAsyncProxy(Object bean, String beanName, Method method) {
? ? ? ? ProxyFactory proxyFactory = new ProxyFactory();
? ? ? ? proxyFactory.setTarget(bean);
? ? ? ? proxyFactory.addAdvisor(asyncAdvisor);
? ? ? ? proxyFactory.setFrozen(true);
? ? ? ? proxyFactory.setProxyTargetClass(true);
? ? ? ? return proxyFactory.getProxy();
? ? }
? ? // ...
}
```
在上述代碼中,`AsyncAnnotationBeanPostProcessor`實(shí)現(xiàn)了`BeanPostProcessor`接口,用于在Bean初始化之前對(duì)帶有`@Async`注解的方法進(jìn)行處理。當(dāng)Spring掃描到帶有`@Async`注解的方法時(shí),`postProcessBeforeInitialization()`方法會(huì)被調(diào)用。
在`postProcessBeforeInitialization()`方法中,首先通過`AopUtils.getTargetClass(bean)`獲取目標(biāo)類的類型,然后遍歷目標(biāo)類的所有方法。對(duì)于每個(gè)方法,使用`AnnotationUtils.findAnnotation(method, Async.class)`查找是否有`@Async`注解。
如果找到了帶有`@Async`注解的方法,就會(huì)調(diào)用`createAsyncProxy()`方法創(chuàng)建代理對(duì)象。在`createAsyncProxy()`方法中,通過`ProxyFactory`創(chuàng)建一個(gè)代理工廠,并設(shè)置目標(biāo)對(duì)象、添加`asyncAdvisor`(異步處理的通知器),以及其他配置。最后調(diào)用`proxyFactory.getProxy()`獲取代理對(duì)象,并將其返回。
通過以上代碼,Spring框架在掃描到帶有`@Async`注解的方法時(shí),會(huì)生成一個(gè)代理對(duì)象來封裝該方法,實(shí)現(xiàn)異步調(diào)用的功能。這樣,在調(diào)用帶有`@Async`注解的方法時(shí),實(shí)際上是通過代理對(duì)象來調(diào)用的,從而實(shí)現(xiàn)了異步執(zhí)行。
### 任務(wù)執(zhí)行器的選擇
根據(jù)@Async注解的配置,Spring會(huì)根據(jù)指定的任務(wù)執(zhí)行器名稱選擇相應(yīng)的任務(wù)執(zhí)行器。如果沒有指定名稱,它將使用默認(rèn)的任務(wù)執(zhí)行器。
在Spring中,根據(jù)`@Async`注解的配置選擇任務(wù)執(zhí)行器的過程涉及以下幾個(gè)關(guān)鍵的源碼部分:
#### `AsyncAnnotationBeanPostProcessor`類
該類是一個(gè)`BeanPostProcessor`,用于處理帶有`@Async`注解的方法。在方法`postProcessBeforeInitialization()`中,它會(huì)檢查方法上的`@Async`注解的配置,并根據(jù)配置選擇任務(wù)執(zhí)行器。
#### `AsyncConfigurer`接口
該接口定義了配置任務(wù)執(zhí)行器的方法。你可以通過實(shí)現(xiàn)該接口來自定義任務(wù)執(zhí)行器的配置。其中,`getAsyncExecutor()`方法用于返回一個(gè)`Executor`對(duì)象,即任務(wù)執(zhí)行器。
#### `AnnotationAsyncExecutionInterceptor`類
該類是一個(gè)AOP的`MethodInterceptor`,用于在調(diào)用帶有`@Async`注解的方法時(shí)進(jìn)行攔截。在`invoke()`方法中,它會(huì)檢查方法上的`@Async`注解的配置,并從`AsyncAnnotationInterceptor`中獲取任務(wù)執(zhí)行器。
#### `AsyncAnnotationInterceptor`類
該類是一個(gè)AOP的`MethodInterceptor`,用于處理帶有`@Async`注解的方法。在`invoke()`方法中,它會(huì)獲取任務(wù)執(zhí)行器,并使用任務(wù)執(zhí)行器來執(zhí)行異步任務(wù)。
#### 相關(guān)的源碼示例
```java
public class AsyncAnnotationBeanPostProcessor implements BeanPostProcessor, Ordered {
? ? private AsyncConfigurer asyncConfigurer;
? ? @Override
? ? public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
? ? ? ? // ...
? ? ? ? // 根據(jù)@Async注解的配置獲取任務(wù)執(zhí)行器
? ? ? ? Executor executor = getExecutor(asyncAnnotation);
? ? ? ? // ...
? ? ? ? // 創(chuàng)建代理對(duì)象并設(shè)置任務(wù)執(zhí)行器
? ? ? ? Object proxy = createAsyncProxy(bean, beanName, method, executor);
? ? ? ? // ...
? ? }
? ? private Executor getExecutor(Async asyncAnnotation) {
? ? ? ? // 從AsyncConfigurer中獲取任務(wù)執(zhí)行器
? ? ? ? Executor executor = this.asyncConfigurer.getAsyncExecutor();
? ? ? ? if (executor == null && this.asyncConfigurer instanceof ApplicationContextAware) {
? ? ? ? ? ? executor = ((ApplicationContextAware) this.asyncConfigurer).getApplicationContext()
? ? ? ? ? ? ? ? ? ? .getBean(TaskExecutor.class);
? ? ? ? }
? ? ? ? if (executor == null) {
? ? ? ? ? ? throw new IllegalStateException("No executor specified for async annotation processing");
? ? ? ? }
? ? ? ? return executor;
? ? }
? ? // ...
? ? private Object createAsyncProxy(Object bean, String beanName, Method method, Executor executor) {
? ? ? ? ProxyFactory proxyFactory = new ProxyFactory();
? ? ? ? proxyFactory.setTarget(bean);
? ? ? ? proxyFactory.addAdvisor(asyncAdvisor);
? ? ? ? // 設(shè)置任務(wù)執(zhí)行器
? ? ? ? proxyFactory.addAdvice(new AnnotationAsyncExecutionInterceptor(executor));
? ? ? ? proxyFactory.setFrozen(true);
? ? ? ? proxyFactory.setProxyTargetClass(true);
? ? ? ? return proxyFactory.getProxy();
? ? }
? ? // ...
}
public class AnnotationAsyncExecutionInterceptor implements MethodInterceptor {
? ? private final Executor executor;
? ? public AnnotationAsyncExecutionInterceptor(Executor executor) {
? ? ? ? this.executor = executor;
? ? }
? ? @Override
? ? public Object invoke(MethodInvocation invocation) throws Throwable {
? ? ? ? // 獲取目標(biāo)方法上的@Async注解的配置信息
? ? ? ? // ...
? ? ? ? // 執(zhí)行異步任務(wù)
? ? ? ? Future<?> future = executor.submit(task);
? ? ? ? // ...
? ? }
? ? // ...
}
```
在上述代碼中,`AsyncAnnotationBeanPostProcessor`類通過`getExecutor()`方法獲取任務(wù)執(zhí)行器,該方法會(huì)優(yōu)先從`AsyncConfigurer`中獲取執(zhí)行
器,如果未配置,則從Spring容器中獲取`TaskExecutor` bean。然后,在創(chuàng)建代理對(duì)象時(shí),將任務(wù)執(zhí)行器傳遞給`AnnotationAsyncExecutionInterceptor`。
`AnnotationAsyncExecutionInterceptor`類的`invoke()`方法中,使用獲取到的任務(wù)執(zhí)行器來執(zhí)行異步任務(wù),通過`executor.submit(task)`將任務(wù)提交給執(zhí)行器。
通過上述源碼,當(dāng)Spring掃描到帶有`@Async`注解的方法時(shí),會(huì)根據(jù)配置獲取任務(wù)執(zhí)行器,并將其傳遞給代理對(duì)象,在執(zhí)行異步方法時(shí)使用相應(yīng)的任務(wù)執(zhí)行器。這樣就可以根據(jù)配置選擇不同的任務(wù)執(zhí)行器來處理異步方法的調(diào)用。
### 方法調(diào)用的封裝
代理對(duì)象在調(diào)用異步方法時(shí),會(huì)將該方法的執(zhí)行請(qǐng)求封裝成一個(gè)Runnable或Callable的任務(wù),并提交給任務(wù)執(zhí)行器。任務(wù)執(zhí)行器會(huì)負(fù)責(zé)管理線程池,并在合適的時(shí)間執(zhí)行異步任務(wù)。
在Spring中,代理對(duì)象在調(diào)用異步方法時(shí),會(huì)將該方法的執(zhí)行請(qǐng)求封裝成一個(gè)`Runnable`或`Callable`的任務(wù),并提交給任務(wù)執(zhí)行器。這個(gè)過程涉及以下關(guān)鍵的源碼部分:
#### `AnnotationAsyncExecutionInterceptor`類
該類是一個(gè)AOP的`MethodInterceptor`,用于處理帶有`@Async`注解的方法。在`invoke()`方法中,它會(huì)將異步方法的執(zhí)行請(qǐng)求封裝成`Runnable`或`Callable`的任務(wù)對(duì)象。
#### `AsyncExecutionInterceptor.AsyncExecutionRunnable`類和`AsyncExecutionInterceptor.AsyncExecutionCallable`類
這兩個(gè)類分別實(shí)現(xiàn)了`Runnable`和`Callable`接口,用于封裝異步方法的執(zhí)行邏輯。
#### `TaskExecutor`接口和其實(shí)現(xiàn)類
Spring提供了多個(gè)實(shí)現(xiàn)了`TaskExecutor`接口的任務(wù)執(zhí)行器,例如`ThreadPoolTaskExecutor`、`SimpleAsyncTaskExecutor`等。任務(wù)執(zhí)行器負(fù)責(zé)管理線程池,并執(zhí)行提交的任務(wù)。
#### 相關(guān)源碼示例
```java
public class AnnotationAsyncExecutionInterceptor implements MethodInterceptor {
? ? private final Executor executor;
? ? public AnnotationAsyncExecutionInterceptor(Executor executor) {
? ? ? ? this.executor = executor;
? ? }
? ? @Override
? ? public Object invoke(MethodInvocation invocation) throws Throwable {
? ? ? ? // 獲取目標(biāo)方法上的@Async注解的配置信息
? ? ? ? // ...
? ? ? ? // 封裝異步任務(wù)
? ? ? ? Runnable task = new AsyncExecutionRunnable(executor, asyncAnnotation, invocation);
? ? ? ? // 提交異步任務(wù)給任務(wù)執(zhí)行器
? ? ? ? executor.execute(task);
? ? ? ? // 如果有返回值,則返回Future對(duì)象
? ? ? ? if (hasReturnType) {
? ? ? ? ? ? // 創(chuàng)建并返回Future對(duì)象
? ? ? ? ? ? return createFutureResult(asyncAnnotation, task);
? ? ? ? } else {
? ? ? ? ? ? return null;
? ? ? ? }
? ? }
? ? // ...
}
public class AsyncExecutionRunnable implements Runnable {
? ? private final Executor executor;
? ? private final Async asyncAnnotation;
? ? private final MethodInvocation invocation;
? ? public AsyncExecutionRunnable(Executor executor, Async asyncAnnotation, MethodInvocation invocation) {
? ? ? ? this.executor = executor;
? ? ? ? this.asyncAnnotation = asyncAnnotation;
? ? ? ? this.invocation = invocation;
? ? }
? ? @Override
? ? public void run() {
? ? ? ? try {
? ? ? ? ? ? // 執(zhí)行異步方法
? ? ? ? ? ? invocation.proceed();
? ? ? ? } catch (Throwable ex) {
? ? ? ? ? ? // 處理異常
? ? ? ? }
? ? }
}
public interface TaskExecutor {
? ? void execute(Runnable task);
? ? // ...
}
public class ThreadPoolTaskExecutor implements TaskExecutor {
? ? // ...
? ? @Override
? ? public void execute(Runnable task) {
? ? ? ? // 提交任務(wù)給線程池執(zhí)行
? ? ? ? getThreadPoolExecutor().execute(task);
? ? }
? ? // ...
}
```
在上述代碼中,`AnnotationAsyncExecutionInterceptor`類的`invoke()`方法中,首先根據(jù)`@Async`注解的配置信息,創(chuàng)建一個(gè)`AsyncExecutionRunnable`對(duì)象,將任務(wù)執(zhí)行邏輯封裝在其中。
接著,通過調(diào)用任務(wù)執(zhí)行器的`execute()`方法,將`AsyncExecutionRunnable`對(duì)象提交給任務(wù)執(zhí)行器執(zhí)行。任務(wù)執(zhí)行器在內(nèi)部會(huì)從線程池中獲取一個(gè)空閑的線程,并執(zhí)行`run()`方法中的異步方法調(diào)用。
當(dāng)異步方法執(zhí)行完成或出現(xiàn)異常時(shí),任務(wù)執(zhí)行器會(huì)進(jìn)行相應(yīng)的處理。
通過以上源碼,可以看出代理對(duì)象在調(diào)用異步方法時(shí),會(huì)將異步方法的執(zhí)行請(qǐng)求封裝成一個(gè)`Runnable`或`Callable`的任務(wù),并提交給任務(wù)執(zhí)行器來執(zhí)行。這樣就實(shí)現(xiàn)了異步方法的調(diào)用和執(zhí)行。
### 異步任務(wù)的執(zhí)行
任務(wù)執(zhí)行器會(huì)從線程池中獲取一個(gè)空閑的線程來執(zhí)行異步任務(wù)。異步任務(wù)的執(zhí)行可能會(huì)被放入線程池的任務(wù)隊(duì)列中,直到有線程可用為止。
### 異步方法的返回值處理
如果異步方法有返回值,代理對(duì)象會(huì)返回一個(gè)Future對(duì)象,用于獲取異步方法執(zhí)行的結(jié)果。你可以通過Future對(duì)象來獲取異步方法的返回值或處理異常。
## 更多異步線程池的庫(kù)或框架
除了使用`@Async`注解和自定義線程池外,在Spring框架中,還可以結(jié)合其他異步線程池的庫(kù)或框架來實(shí)現(xiàn)異步任務(wù)的執(zhí)行。以下是一些常見的異步線程池的庫(kù)或框架:
### Guava
Guava是Google提供的Java工具庫(kù),其中包含了一個(gè)`ListeningExecutorService`接口,可以將`ExecutorService`轉(zhuǎn)換為支持異步操作的執(zhí)行器。
### CompletableFuture
`CompletableFuture`是Java 8引入的一個(gè)類,提供了一種方便的方式來執(zhí)行異步任務(wù)??梢越Y(jié)合`CompletableFuture`和`Executor`來實(shí)現(xiàn)異步任務(wù)的執(zhí)行。
#### 用法
`CompletableFuture`是Java 8中引入的一個(gè)類,用于支持異步編程和處理異步任務(wù)的結(jié)果。它提供了一種方便的方式來處理異步操作,并支持將多個(gè)異步任務(wù)組合在一起。下面是`CompletableFuture`的一些常用用法以及如何與Spring Boot一起使用:
1. 創(chuàng)建一個(gè)異步任務(wù):
? ?```java
? ?CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
? ? ? ?// 異步任務(wù)的邏輯
? ? ? ?return "Hello, World!";
? ?});
? ?```
2. 調(diào)用異步任務(wù)的結(jié)果:
? ?```java
? ?future.thenAccept(result -> {
? ? ? ?// 處理異步任務(wù)的結(jié)果
? ? ? ?System.out.println(result);
? ?});
? ?```
3. 組合多個(gè)異步任務(wù):
? ?```java
? ?CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
? ?CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
? ?CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + " " + result2);
? ?combinedFuture.thenAccept(result -> {
? ? ? ?// 處理組合異步任務(wù)的結(jié)果
? ? ? ?System.out.println(result);
? ?});
? ?```
4. 異步任務(wù)的異常處理:
? ?```java
? ?CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
? ? ? ?// 異步任務(wù)的邏輯
? ? ? ?throw new RuntimeException("Something went wrong");
? ?});
? ?future.exceptionally(ex -> {
? ? ? ?// 處理異步任務(wù)的異常
? ? ? ?System.err.println("Exception: " + ex.getMessage());
? ? ? ?return 0; // 設(shè)置默認(rèn)值
? ?});
? ?```
#### 與springboot集成
與Spring Boot一起使用`CompletableFuture`時(shí),你可以將異步任務(wù)作為Spring Bean的方法,并使用`@Async`注解進(jìn)行標(biāo)記,以便在Spring容器中進(jìn)行管理和調(diào)用。以下是一個(gè)示例:
1. 在Spring Boot應(yīng)用程序的配置類中,啟用異步支持:
? ?```java
? ?@Configuration
? ?@EnableAsync
? ?public class AppConfig {
? ? ? ?// 配置其他Bean和設(shè)置
? ? ? ?// 聲明異步任務(wù)執(zhí)行器
? ? ? ?@Bean
? ? ? ?public Executor asyncExecutor() {
? ? ? ? ? ?ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
? ? ? ? ? ?// 配置線程池參數(shù)
? ? ? ? ? ?// ...
? ? ? ? ? ?return executor;
? ? ? ?}
? ?}
? ?```
2. 在Spring Bean中定義一個(gè)異步方法,使用`CompletableFuture`進(jìn)行異步操作:
? ?```java
? ?@Service
? ?public class MyService {
? ? ? ?@Async
? ? ? ?public CompletableFuture<String> doAsyncTask() {
? ? ? ? ? ?CompletableFuture<String> future = new CompletableFuture<>();
? ? ? ? ? ?// 異步任務(wù)的邏輯
? ? ? ? ? ?// ...
? ? ? ? ? ?return future;
? ? ? ?}
? ?}
? ?```
3. 在其他組件中調(diào)用異步方法,并處理異步結(jié)果:
? ?```java
? ?@RestController
? ?public class MyController {
? ? ? ?@Autowired
? ? ? ?private MyService myService;
? ? ? ?@GetMapping("/async")
? ? ? ?public ResponseEntity<CompletableFuture<String>> asyncEndpoint() {
? ? ? ? ? ?CompletableFuture<String> future = myService.doAsyncTask();
? ? ? ? ? ?return ResponseEntity.ok(future);
? ? ? ?}
? ?}
? ?```
通過將異步任務(wù)定義為Spring Bean中的方法,并使用`@Async`注解進(jìn)行標(biāo)記,Spring Boot將自動(dòng)為這些方法創(chuàng)建異步代理,使它們能夠在單獨(dú)的線程中執(zhí)行。然后,你可以在其他組件中調(diào)用這些異步方法,并使用`CompletableFuture`來處理異步任務(wù)的結(jié)果。
#### 在異步操作時(shí),相比于`Executors.newSingleThreadExecutor().submit`,使用`CompletableFuture.supplyAsync`的優(yōu)勢(shì)
相比于`Executors.newSingleThreadExecutor().submit`,使用`CompletableFuture.supplyAsync`有以下幾個(gè)好處:
1. 異步任務(wù)的創(chuàng)建和執(zhí)行更加簡(jiǎn)潔:使用`CompletableFuture.supplyAsync`可以直接將異步任務(wù)的邏輯封裝在Lambda表達(dá)式中,并將其作為參數(shù)傳遞給方法。這樣可以更加簡(jiǎn)潔地創(chuàng)建和執(zhí)行異步任務(wù),而不需要顯式地創(chuàng)建`Executor`和`Callable`對(duì)象。
2. 支持更多的組合和鏈?zhǔn)讲僮鳎篳CompletableFuture`提供了豐富的方法來處理異步任務(wù)的結(jié)果。你可以使用`thenApply`、`thenAccept`、`thenCombine`等方法來對(duì)異步任務(wù)的結(jié)果進(jìn)行轉(zhuǎn)換、處理和組合。這使得在異步任務(wù)之間進(jìn)行鏈?zhǔn)讲僮骱蛿?shù)據(jù)流轉(zhuǎn)變變得更加便捷。
3. 支持異常處理和超時(shí)控制:`CompletableFuture`提供了方法來處理異步任務(wù)的異常情況,如`exceptionally`、`handle`等。它還支持設(shè)置異步任務(wù)的超時(shí)時(shí)間,通過`get`方法的重載版本來控制等待異步結(jié)果的超時(shí)時(shí)間。這樣可以更好地管理和處理異步任務(wù)中可能出現(xiàn)的異常和超時(shí)情況。
4. 更靈活的執(zhí)行器選擇:`CompletableFuture`允許你傳入自定義的執(zhí)行器(`Executor`)來執(zhí)行異步任務(wù)。這樣你可以根據(jù)需求選擇不同類型的執(zhí)行器,如固定大小線程池、緩存線程池等,以滿足特定的并發(fā)需求。
5. 支持函數(shù)式編程:`CompletableFuture`采用了函數(shù)式編程的風(fēng)格,通過方法鏈?zhǔn)秸{(diào)用和Lambda表達(dá)式來組織異步任務(wù)的邏輯。這種風(fēng)格更加符合現(xiàn)代Java開發(fā)的趨勢(shì),并提供了更高的可讀性和可維護(hù)性。
綜上所述,使用`CompletableFuture.supplyAsync`相比于`Executors.newSingleThreadExecutor().submit`具有更多的優(yōu)勢(shì),能夠提供更靈活、更簡(jiǎn)潔、更可組合的異步任務(wù)編程方式。
### Apache Commons
Apache Commons庫(kù)提供了`ExecutorService`的實(shí)現(xiàn),例如`BasicThreadFactory`和`DefaultExecutorServiceFactory`,可以用于創(chuàng)建和配置線程池。
#### `BasicThreadFactory`
它是一個(gè)簡(jiǎn)單的線程工廠,用于創(chuàng)建線程并配置其屬性。你可以使用它來自定義線程的命名、優(yōu)先級(jí)、守護(hù)狀態(tài)等。
? ?示例代碼:
? ?```java
? ?import org.apache.commons.lang3.concurrent.BasicThreadFactory;
? ?// 創(chuàng)建線程池
? ?ExecutorService executor = Executors.newFixedThreadPool(10,
? ? ? ? ? ?new BasicThreadFactory.Builder()
? ? ? ? ? ? ? ? ? ?.namingPattern("my-pool-thread-%d")
? ? ? ? ? ? ? ? ? ?.daemon(false)
? ? ? ? ? ? ? ? ? ?.priority(Thread.NORM_PRIORITY)
? ? ? ? ? ? ? ? ? ?.build());
? ?// 執(zhí)行任務(wù)
? ?executor.execute(() -> {
? ? ? ?// 異步任務(wù)邏輯
? ?});
? ?```
#### `DefaultExecutorServiceFactory`
它是一個(gè)用于創(chuàng)建`ExecutorService`實(shí)例的工廠類。你可以使用它來創(chuàng)建具有自定義屬性的`ExecutorService`,如核心線程數(shù)、最大線程數(shù)、線程存活時(shí)間等。
? ?示例代碼:
? ?```java
? ?import org.apache.commons.lang3.concurrent.DefaultExecutorServiceFactory;
? ?// 創(chuàng)建線程池工廠
? ?DefaultExecutorServiceFactory factory = new DefaultExecutorServiceFactory();
? ?factory.setMaximumPoolSize(10); // 設(shè)置最大線程數(shù)
? ?factory.setThreadNamePrefix("my-pool-thread-"); // 設(shè)置線程名前綴
? ?// 創(chuàng)建線程池
? ?ExecutorService executor = factory.createExecutorService();
? ?// 執(zhí)行任務(wù)
? ?executor.execute(() -> {
? ? ? ?// 異步任務(wù)邏輯
? ?});
? ?```
這些`ExecutorService`的實(shí)現(xiàn)類可以根據(jù)你的需求進(jìn)行配置和使用,以滿足不同的并發(fā)處理需求。可以根據(jù)自己的項(xiàng)目需要選擇合適的實(shí)現(xiàn)類,并根據(jù)需要設(shè)置線程池的屬性和配置。
### Netflix Hystrix
Hystrix是Netflix開發(fā)的容錯(cuò)庫(kù),提供了異步執(zhí)行和線程池隔離等功能。它可以與Spring集成,用于處理異步任務(wù)的執(zhí)行和容錯(cuò)機(jī)制。
這些庫(kù)或框架提供了更多的功能和選項(xiàng),可以根據(jù)具體需求選擇合適的異步線程池庫(kù)或框架來實(shí)現(xiàn)異步任務(wù)的執(zhí)行。在Spring中,可以通過配置或自定義`TaskExecutor`來集成這些庫(kù)或框架,并使用它們來執(zhí)行異步任務(wù)。