Add auto-configuration support for TaskExecutor
This commit adds support for providing a default ThreadPoolTaskExecutor with sensible defaults. A new TaskExecutorBuilder is provided with defaults from the `spring.task.*` namespace and can be used to create custom instances. If no custom `Executor` bean is present, `@EnableAsync` now uses the auto-configure application task executor. Same goes for the async support in Spring MVC. Closes gh-1563pull/14004/head
parent
193b2f187b
commit
c071f34a4a
@ -0,0 +1,90 @@
|
||||
/*
|
||||
* Copyright 2012-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.boot.autoconfigure.task;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.springframework.beans.factory.ObjectProvider;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.boot.task.TaskExecutorBuilder;
|
||||
import org.springframework.boot.task.TaskExecutorCustomizer;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.core.task.TaskDecorator;
|
||||
import org.springframework.core.task.TaskExecutor;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
|
||||
/**
|
||||
* {@link EnableAutoConfiguration Auto-configuration} for {@link TaskExecutor}.
|
||||
*
|
||||
* @author Stephane Nicoll
|
||||
* @since 2.1.0
|
||||
*/
|
||||
@ConditionalOnClass(ThreadPoolTaskExecutor.class)
|
||||
@Configuration
|
||||
@EnableConfigurationProperties(TaskProperties.class)
|
||||
public class TaskExecutorAutoConfiguration {
|
||||
|
||||
/**
|
||||
* Bean name of the application {@link TaskExecutor}.
|
||||
*/
|
||||
public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor";
|
||||
|
||||
private final TaskProperties properties;
|
||||
|
||||
private final ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers;
|
||||
|
||||
private final ObjectProvider<TaskDecorator> taskDecorator;
|
||||
|
||||
public TaskExecutorAutoConfiguration(TaskProperties properties,
|
||||
ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers,
|
||||
ObjectProvider<TaskDecorator> taskDecorator) {
|
||||
this.properties = properties;
|
||||
this.taskExecutorCustomizers = taskExecutorCustomizers;
|
||||
this.taskDecorator = taskDecorator;
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
public TaskExecutorBuilder taskExecutorBuilder() {
|
||||
TaskExecutorBuilder builder = new TaskExecutorBuilder();
|
||||
TaskProperties.Pool pool = this.properties.getPool();
|
||||
builder = builder.queueCapacity(pool.getQueueCapacity())
|
||||
.corePoolSize(pool.getCoreSize()).maxPoolSize(pool.getMaxSize())
|
||||
.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout())
|
||||
.keepAlive(pool.getKeepAlive());
|
||||
builder = builder.threadNamePrefix(this.properties.getThreadNamePrefix());
|
||||
builder = builder.customizers(
|
||||
this.taskExecutorCustomizers.stream().collect(Collectors.toList()));
|
||||
TaskDecorator taskDecorator = this.taskDecorator.getIfUnique();
|
||||
if (taskDecorator != null) {
|
||||
builder = builder.taskDecorator(taskDecorator);
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Bean(name = APPLICATION_TASK_EXECUTOR_BEAN_NAME)
|
||||
@ConditionalOnMissingBean(Executor.class)
|
||||
public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,123 @@
|
||||
/*
|
||||
* Copyright 2012-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.boot.autoconfigure.task;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
|
||||
/**
|
||||
* Configuration properties for task execution.
|
||||
*
|
||||
* @author Stephane Nicoll
|
||||
*/
|
||||
@ConfigurationProperties("spring.task")
|
||||
public class TaskProperties {
|
||||
|
||||
private final Pool pool = new Pool();
|
||||
|
||||
/**
|
||||
* Prefix to use for the names of newly created threads.
|
||||
*/
|
||||
private String threadNamePrefix = "executor-";
|
||||
|
||||
public Pool getPool() {
|
||||
return this.pool;
|
||||
}
|
||||
|
||||
public String getThreadNamePrefix() {
|
||||
return this.threadNamePrefix;
|
||||
}
|
||||
|
||||
public void setThreadNamePrefix(String threadNamePrefix) {
|
||||
this.threadNamePrefix = threadNamePrefix;
|
||||
}
|
||||
|
||||
public static class Pool {
|
||||
|
||||
/**
|
||||
* Queue capacity. A unbounded capacity does not increase the pool and therefore
|
||||
* ignores the "max-size" parameter.
|
||||
*/
|
||||
private int queueCapacity = Integer.MAX_VALUE;
|
||||
|
||||
/**
|
||||
* Core number of threads.
|
||||
*/
|
||||
private int coreSize = 8;
|
||||
|
||||
/**
|
||||
* Maximum allowed number of threads. If tasks are filling up the queue, the pool
|
||||
* can expand up to that size to accommodate the load. Ignored if the queue is
|
||||
* unbounded.
|
||||
*/
|
||||
private int maxSize = Integer.MAX_VALUE;
|
||||
|
||||
/**
|
||||
* Whether core threads are allowed to time out. This enables dynamic growing and
|
||||
* shrinking of the pool.
|
||||
*/
|
||||
private boolean allowCoreThreadTimeout = true;
|
||||
|
||||
/**
|
||||
* Time limit for which threads may remain idle before being terminated.
|
||||
*/
|
||||
private Duration keepAlive = Duration.ofSeconds(60);
|
||||
|
||||
public int getQueueCapacity() {
|
||||
return this.queueCapacity;
|
||||
}
|
||||
|
||||
public void setQueueCapacity(int queueCapacity) {
|
||||
this.queueCapacity = queueCapacity;
|
||||
}
|
||||
|
||||
public int getCoreSize() {
|
||||
return this.coreSize;
|
||||
}
|
||||
|
||||
public void setCoreSize(int coreSize) {
|
||||
this.coreSize = coreSize;
|
||||
}
|
||||
|
||||
public int getMaxSize() {
|
||||
return this.maxSize;
|
||||
}
|
||||
|
||||
public void setMaxSize(int maxSize) {
|
||||
this.maxSize = maxSize;
|
||||
}
|
||||
|
||||
public boolean isAllowCoreThreadTimeout() {
|
||||
return this.allowCoreThreadTimeout;
|
||||
}
|
||||
|
||||
public void setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) {
|
||||
this.allowCoreThreadTimeout = allowCoreThreadTimeout;
|
||||
}
|
||||
|
||||
public Duration getKeepAlive() {
|
||||
return this.keepAlive;
|
||||
}
|
||||
|
||||
public void setKeepAlive(Duration keepAlive) {
|
||||
this.keepAlive = keepAlive;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,20 @@
|
||||
/*
|
||||
* Copyright 2012-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Auto-configuration for task execution.
|
||||
*/
|
||||
package org.springframework.boot.autoconfigure.task;
|
@ -0,0 +1,212 @@
|
||||
/*
|
||||
* Copyright 2012-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.boot.autoconfigure.task;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.beans.DirectFieldAccessor;
|
||||
import org.springframework.boot.autoconfigure.AutoConfigurations;
|
||||
import org.springframework.boot.task.TaskExecutorBuilder;
|
||||
import org.springframework.boot.task.TaskExecutorCustomizer;
|
||||
import org.springframework.boot.test.context.assertj.AssertableApplicationContext;
|
||||
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
|
||||
import org.springframework.boot.test.context.runner.ContextConsumer;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.core.task.SyncTaskExecutor;
|
||||
import org.springframework.core.task.TaskDecorator;
|
||||
import org.springframework.core.task.TaskExecutor;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.scheduling.annotation.AsyncResult;
|
||||
import org.springframework.scheduling.annotation.EnableAsync;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.test.util.ReflectionTestUtils;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
/**
|
||||
* Tests for {@link TaskExecutorAutoConfiguration}.
|
||||
*
|
||||
* @author Stephane Nicoll
|
||||
*/
|
||||
public class TaskExecutorAutoConfigurationTests {
|
||||
|
||||
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
|
||||
.withConfiguration(
|
||||
AutoConfigurations.of(TaskExecutorAutoConfiguration.class));
|
||||
|
||||
@Test
|
||||
public void taskExecutorBuilderShouldApplyCustomSettings() {
|
||||
this.contextRunner
|
||||
.withPropertyValues("spring.task.pool.queue-capacity=10",
|
||||
"spring.task.pool.core-size=2", "spring.task.pool.max-size=4",
|
||||
"spring.task.pool.allow-core-thread-timeout=true",
|
||||
"spring.task.pool.keep-alive=5s",
|
||||
"spring.task.thread-name-prefix=mytest-")
|
||||
.run(assertTaskExecutor((taskExecutor) -> {
|
||||
DirectFieldAccessor dfa = new DirectFieldAccessor(taskExecutor);
|
||||
assertThat(dfa.getPropertyValue("queueCapacity")).isEqualTo(10);
|
||||
assertThat(taskExecutor.getCorePoolSize()).isEqualTo(2);
|
||||
assertThat(taskExecutor.getMaxPoolSize()).isEqualTo(4);
|
||||
assertThat(dfa.getPropertyValue("allowCoreThreadTimeOut"))
|
||||
.isEqualTo(true);
|
||||
assertThat(taskExecutor.getKeepAliveSeconds()).isEqualTo(5);
|
||||
assertThat(taskExecutor.getThreadNamePrefix()).isEqualTo("mytest-");
|
||||
}));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void taskExecutorBuilderWhenHasCustomBuilderShouldUseCustomBuilder() {
|
||||
this.contextRunner.withUserConfiguration(CustomTaskExecutorBuilderConfig.class)
|
||||
.run((context) -> {
|
||||
assertThat(context).hasSingleBean(TaskExecutorBuilder.class);
|
||||
assertThat(context.getBean(TaskExecutorBuilder.class))
|
||||
.isSameAs(context.getBean(
|
||||
CustomTaskExecutorBuilderConfig.class).taskExecutorBuilder);
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void taskExecutorBuilderShouldUseTaskDecorator() {
|
||||
this.contextRunner.withUserConfiguration(TaskDecoratorConfig.class)
|
||||
.run((context) -> {
|
||||
assertThat(context).hasSingleBean(TaskExecutorBuilder.class);
|
||||
ThreadPoolTaskExecutor executor = context
|
||||
.getBean(TaskExecutorBuilder.class).build();
|
||||
assertThat(ReflectionTestUtils.getField(executor, "taskDecorator"))
|
||||
.isSameAs(context.getBean(TaskDecorator.class));
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void taskExecutorAutoConfigured() {
|
||||
this.contextRunner.run((context) -> {
|
||||
assertThat(context).hasSingleBean(Executor.class);
|
||||
assertThat(context).hasBean("applicationTaskExecutor");
|
||||
assertThat(context).getBean("applicationTaskExecutor")
|
||||
.isInstanceOf(ThreadPoolTaskExecutor.class);
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void taskExecutorWhenHasCustomTaskExecutorShouldBAckOff() {
|
||||
this.contextRunner.withUserConfiguration(CustomTaskExecutorConfig.class)
|
||||
.run((context) -> {
|
||||
assertThat(context).hasSingleBean(Executor.class);
|
||||
assertThat(context.getBean(Executor.class))
|
||||
.isSameAs(context.getBean("customTaskExecutorBuilder"));
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void taskExecutorBuilderShouldApplyCustomizer() {
|
||||
this.contextRunner.withUserConfiguration(CustomTaskExecutorConfig.class,
|
||||
TaskExecutorCustomizerConfig.class).run((context) -> {
|
||||
TaskExecutorCustomizer customizer = context
|
||||
.getBean(TaskExecutorCustomizer.class);
|
||||
ThreadPoolTaskExecutor executor = context
|
||||
.getBean(TaskExecutorBuilder.class).build();
|
||||
verify(customizer).customize(executor);
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void enableAsyncUsesAutoConfiguredOneByDefault() {
|
||||
this.contextRunner
|
||||
.withPropertyValues("spring.task.thread-name-prefix=executor-test-")
|
||||
.withUserConfiguration(AsyncConfiguration.class, TestBean.class)
|
||||
.run((context) -> {
|
||||
assertThat(context).hasSingleBean(TaskExecutor.class);
|
||||
TestBean bean = context.getBean(TestBean.class);
|
||||
String text = bean.echo("test").get();
|
||||
assertThat(text).contains("executor-test-").contains("test");
|
||||
});
|
||||
}
|
||||
|
||||
private ContextConsumer<AssertableApplicationContext> assertTaskExecutor(
|
||||
Consumer<ThreadPoolTaskExecutor> taskExecutor) {
|
||||
return (context) -> {
|
||||
assertThat(context).hasSingleBean(TaskExecutorBuilder.class);
|
||||
TaskExecutorBuilder builder = context.getBean(TaskExecutorBuilder.class);
|
||||
taskExecutor.accept(builder.build());
|
||||
};
|
||||
}
|
||||
|
||||
@Configuration
|
||||
static class CustomTaskExecutorBuilderConfig {
|
||||
|
||||
private final TaskExecutorBuilder taskExecutorBuilder = new TaskExecutorBuilder();
|
||||
|
||||
@Bean
|
||||
public TaskExecutorBuilder customTaskExecutorBuilder() {
|
||||
return this.taskExecutorBuilder;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Configuration
|
||||
static class TaskExecutorCustomizerConfig {
|
||||
|
||||
@Bean
|
||||
public TaskExecutorCustomizer mockTaskExecutorCustomizer() {
|
||||
return mock(TaskExecutorCustomizer.class);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Configuration
|
||||
static class TaskDecoratorConfig {
|
||||
|
||||
@Bean
|
||||
public TaskDecorator mockTaskDecorator() {
|
||||
return mock(TaskDecorator.class);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Configuration
|
||||
static class CustomTaskExecutorConfig {
|
||||
|
||||
@Bean
|
||||
public Executor customTaskExecutorBuilder() {
|
||||
return new SyncTaskExecutor();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Configuration
|
||||
@EnableAsync
|
||||
static class AsyncConfiguration {
|
||||
|
||||
}
|
||||
|
||||
static class TestBean {
|
||||
|
||||
@Async
|
||||
public Future<String> echo(String text) {
|
||||
return new AsyncResult<>(Thread.currentThread().getName() + " " + text);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,307 @@
|
||||
/*
|
||||
* Copyright 2012-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.boot.task;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.boot.context.properties.PropertyMapper;
|
||||
import org.springframework.core.task.TaskDecorator;
|
||||
import org.springframework.core.task.TaskExecutor;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
/**
|
||||
* Builder that can be used to configure and create a {@link TaskExecutor}. Provides
|
||||
* convenience methods to set common {@link ThreadPoolTaskExecutor} settings and register
|
||||
* {@link #taskDecorator(TaskDecorator)}). For advanced configuration, consider using
|
||||
* {@link TaskExecutorCustomizer}.
|
||||
* <p>
|
||||
* In a typical auto-configured Spring Boot application this builder is available as a
|
||||
* bean and can be injected whenever a {@link TaskExecutor} is needed.
|
||||
*
|
||||
* @author Stephane Nicoll
|
||||
* @since 2.1.0
|
||||
*/
|
||||
public class TaskExecutorBuilder {
|
||||
|
||||
private final Integer queueCapacity;
|
||||
|
||||
private final Integer corePoolSize;
|
||||
|
||||
private final Integer maxPoolSize;
|
||||
|
||||
private final Boolean allowCoreThreadTimeOut;
|
||||
|
||||
private final Duration keepAlive;
|
||||
|
||||
private final String threadNamePrefix;
|
||||
|
||||
private final TaskDecorator taskDecorator;
|
||||
|
||||
private final Set<TaskExecutorCustomizer> taskExecutorCustomizers;
|
||||
|
||||
public TaskExecutorBuilder(TaskExecutorCustomizer... taskExecutorCustomizers) {
|
||||
Assert.notNull(taskExecutorCustomizers,
|
||||
"TaskExecutorCustomizers must not be null");
|
||||
this.queueCapacity = null;
|
||||
this.corePoolSize = null;
|
||||
this.maxPoolSize = null;
|
||||
this.allowCoreThreadTimeOut = null;
|
||||
this.keepAlive = null;
|
||||
this.threadNamePrefix = null;
|
||||
this.taskDecorator = null;
|
||||
this.taskExecutorCustomizers = Collections.unmodifiableSet(
|
||||
new LinkedHashSet<>(Arrays.asList(taskExecutorCustomizers)));
|
||||
}
|
||||
|
||||
public TaskExecutorBuilder(Integer queueCapacity, Integer corePoolSize,
|
||||
Integer maxPoolSize, Boolean allowCoreThreadTimeOut, Duration keepAlive,
|
||||
String threadNamePrefix, TaskDecorator taskDecorator,
|
||||
Set<TaskExecutorCustomizer> taskExecutorCustomizers) {
|
||||
this.queueCapacity = queueCapacity;
|
||||
this.corePoolSize = corePoolSize;
|
||||
this.maxPoolSize = maxPoolSize;
|
||||
this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
|
||||
this.keepAlive = keepAlive;
|
||||
this.threadNamePrefix = threadNamePrefix;
|
||||
this.taskDecorator = taskDecorator;
|
||||
this.taskExecutorCustomizers = taskExecutorCustomizers;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the capacity of the queue. A unbounded capacity does not increase the pool and
|
||||
* therefore ignores {@link #maxPoolSize(int) maxPoolSize}.
|
||||
* @param queueCapacity the queue capacity to set
|
||||
* @return a new builder instance
|
||||
*/
|
||||
public TaskExecutorBuilder queueCapacity(int queueCapacity) {
|
||||
return new TaskExecutorBuilder(queueCapacity, this.corePoolSize, this.maxPoolSize,
|
||||
this.allowCoreThreadTimeOut, this.keepAlive, this.threadNamePrefix,
|
||||
this.taskDecorator, this.taskExecutorCustomizers);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the core number of threads. Effectively that maximum number of threads as long
|
||||
* as the queue is not full.
|
||||
* <p>
|
||||
* Core threads can grow and shrink if {@link #allowCoreThreadTimeOut(boolean)} is
|
||||
* enabled.
|
||||
* @param corePoolSize the core pool size to set
|
||||
* @return a new builder instance
|
||||
*/
|
||||
public TaskExecutorBuilder corePoolSize(int corePoolSize) {
|
||||
return new TaskExecutorBuilder(this.queueCapacity, corePoolSize, this.maxPoolSize,
|
||||
this.allowCoreThreadTimeOut, this.keepAlive, this.threadNamePrefix,
|
||||
this.taskDecorator, this.taskExecutorCustomizers);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the maximum allowed number of threads. When the {@link #queueCapacity(int)
|
||||
* queue} is full, the pool can expand up to that size to accommodate the load.
|
||||
* <p>
|
||||
* If the {@link #queueCapacity(int) queue capacity} is unbounded, this setting is
|
||||
* ignored.
|
||||
* @param maxPoolSize the max pool size to set
|
||||
* @return a new builder instance
|
||||
*/
|
||||
public TaskExecutorBuilder maxPoolSize(int maxPoolSize) {
|
||||
return new TaskExecutorBuilder(this.queueCapacity, this.corePoolSize, maxPoolSize,
|
||||
this.allowCoreThreadTimeOut, this.keepAlive, this.threadNamePrefix,
|
||||
this.taskDecorator, this.taskExecutorCustomizers);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set whether core threads are allow to time out. When enabled, this enables dynamic
|
||||
* growing and shrinking of the pool.
|
||||
* @param allowCoreThreadTimeOut if core thread are allowed to time out
|
||||
* @return a new builder instance
|
||||
*/
|
||||
public TaskExecutorBuilder allowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
|
||||
return new TaskExecutorBuilder(this.queueCapacity, this.corePoolSize,
|
||||
this.maxPoolSize, allowCoreThreadTimeOut, this.keepAlive,
|
||||
this.threadNamePrefix, this.taskDecorator, this.taskExecutorCustomizers);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the time limit for which threads may remain idle before being terminated.
|
||||
* @param keepAlive the keep alive to set
|
||||
* @return a new builder instance
|
||||
*/
|
||||
public TaskExecutorBuilder keepAlive(Duration keepAlive) {
|
||||
return new TaskExecutorBuilder(this.queueCapacity, this.corePoolSize,
|
||||
this.maxPoolSize, this.allowCoreThreadTimeOut, keepAlive,
|
||||
this.threadNamePrefix, this.taskDecorator, this.taskExecutorCustomizers);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the prefix to use for the names of newly created threads.
|
||||
* @param threadNamePrefix the thread name prefix to set
|
||||
* @return a new builder instance
|
||||
*/
|
||||
public TaskExecutorBuilder threadNamePrefix(String threadNamePrefix) {
|
||||
return new TaskExecutorBuilder(this.queueCapacity, this.corePoolSize,
|
||||
this.maxPoolSize, this.allowCoreThreadTimeOut, this.keepAlive,
|
||||
threadNamePrefix, this.taskDecorator, this.taskExecutorCustomizers);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the {@link TaskDecorator} to use or {@code null} to not use any.
|
||||
* @param taskDecorator the task decorator to use
|
||||
* @return a new builder instance
|
||||
*/
|
||||
public TaskExecutorBuilder taskDecorator(TaskDecorator taskDecorator) {
|
||||
return new TaskExecutorBuilder(this.queueCapacity, this.corePoolSize,
|
||||
this.maxPoolSize, this.allowCoreThreadTimeOut, this.keepAlive,
|
||||
this.threadNamePrefix, taskDecorator, this.taskExecutorCustomizers);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the {@link TaskExecutorCustomizer TaskExecutorCustomizers} that should be
|
||||
* applied to the {@link ThreadPoolTaskExecutor}. Customizers are applied in the order
|
||||
* that they were added after builder configuration has been applied. Setting this
|
||||
* value will replace any previously configured customizers.
|
||||
* @param taskExecutorCustomizers the customizers to set
|
||||
* @return a new builder instance
|
||||
* @see #additionalCustomizers(TaskExecutorCustomizer...)
|
||||
*/
|
||||
public TaskExecutorBuilder customizers(
|
||||
TaskExecutorCustomizer... taskExecutorCustomizers) {
|
||||
Assert.notNull(taskExecutorCustomizers,
|
||||
"TaskExecutorCustomizers must not be null");
|
||||
return customizers(Arrays.asList(taskExecutorCustomizers));
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the {@link TaskExecutorCustomizer TaskExecutorCustomizers} that should be
|
||||
* applied to the {@link ThreadPoolTaskExecutor}. Customizers are applied in the order
|
||||
* that they were added after builder configuration has been applied. Setting this
|
||||
* value will replace any previously configured customizers.
|
||||
* @param taskExecutorCustomizers the customizers to set
|
||||
* @return a new builder instance
|
||||
* @see #additionalCustomizers(TaskExecutorCustomizer...)
|
||||
*/
|
||||
public TaskExecutorBuilder customizers(
|
||||
Collection<? extends TaskExecutorCustomizer> taskExecutorCustomizers) {
|
||||
Assert.notNull(taskExecutorCustomizers,
|
||||
"TaskExecutorCustomizers must not be null");
|
||||
return new TaskExecutorBuilder(this.queueCapacity, this.corePoolSize,
|
||||
this.maxPoolSize, this.allowCoreThreadTimeOut, this.keepAlive,
|
||||
this.threadNamePrefix, this.taskDecorator,
|
||||
Collections.unmodifiableSet(new LinkedHashSet<TaskExecutorCustomizer>(
|
||||
taskExecutorCustomizers)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Add {@link TaskExecutorCustomizer TaskExecutorCustomizers} that should be applied
|
||||
* to the {@link ThreadPoolTaskExecutor}. Customizers are applied in the order that
|
||||
* they were added after builder configuration has been applied.
|
||||
* @param taskExecutorCustomizers the customizers to add
|
||||
* @return a new builder instance
|
||||
* @see #customizers(TaskExecutorCustomizer...)
|
||||
*/
|
||||
public TaskExecutorBuilder additionalCustomizers(
|
||||
TaskExecutorCustomizer... taskExecutorCustomizers) {
|
||||
Assert.notNull(taskExecutorCustomizers,
|
||||
"TaskExecutorCustomizers must not be null");
|
||||
return additionalCustomizers(Arrays.asList(taskExecutorCustomizers));
|
||||
}
|
||||
|
||||
/**
|
||||
* Add {@link TaskExecutorCustomizer TaskExecutorCustomizers} that should be applied
|
||||
* to the {@link ThreadPoolTaskExecutor}. Customizers are applied in the order that
|
||||
* they were added after builder configuration has been applied.
|
||||
* @param taskExecutorCustomizers the customizers to add
|
||||
* @return a new builder instance
|
||||
* @see #customizers(TaskExecutorCustomizer...)
|
||||
*/
|
||||
public TaskExecutorBuilder additionalCustomizers(
|
||||
Collection<? extends TaskExecutorCustomizer> taskExecutorCustomizers) {
|
||||
Assert.notNull(taskExecutorCustomizers,
|
||||
"TaskExecutorCustomizers must not be null");
|
||||
return new TaskExecutorBuilder(this.queueCapacity, this.corePoolSize,
|
||||
this.maxPoolSize, this.allowCoreThreadTimeOut, this.keepAlive,
|
||||
this.threadNamePrefix, this.taskDecorator,
|
||||
append(this.taskExecutorCustomizers, taskExecutorCustomizers));
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a new {@link ThreadPoolTaskExecutor} instance and configure it using this
|
||||
* builder.
|
||||
* @return a configured {@link ThreadPoolTaskExecutor} instance.
|
||||
* @see #build(Class)
|
||||
* @see #configure(ThreadPoolTaskExecutor)
|
||||
*/
|
||||
public ThreadPoolTaskExecutor build() {
|
||||
return build(ThreadPoolTaskExecutor.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a new {@link ThreadPoolTaskExecutor} instance of the specified type and
|
||||
* configure it using this builder.
|
||||
* @param <T> the type of task executor
|
||||
* @param taskExecutorClass the template type to create
|
||||
* @return a configured {@link ThreadPoolTaskExecutor} instance.
|
||||
* @see TaskExecutorBuilder#build()
|
||||
* @see #configure(ThreadPoolTaskExecutor)
|
||||
*/
|
||||
public <T extends ThreadPoolTaskExecutor> T build(Class<T> taskExecutorClass) {
|
||||
return configure(BeanUtils.instantiateClass(taskExecutorClass));
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure the provided {@link ThreadPoolTaskExecutor} instance using this builder.
|
||||
* @param <T> the type of task executor
|
||||
* @param taskExecutor the {@link ThreadPoolTaskExecutor} to configure
|
||||
* @return the task executor instance
|
||||
* @see TaskExecutorBuilder#build()
|
||||
* @see TaskExecutorBuilder#build(Class)
|
||||
*/
|
||||
public <T extends ThreadPoolTaskExecutor> T configure(T taskExecutor) {
|
||||
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
|
||||
map.from(() -> this.queueCapacity).to(taskExecutor::setQueueCapacity);
|
||||
map.from(() -> this.corePoolSize).to(taskExecutor::setCorePoolSize);
|
||||
map.from(() -> this.maxPoolSize).to(taskExecutor::setMaxPoolSize);
|
||||
map.from(() -> this.keepAlive).asInt(Duration::getSeconds)
|
||||
.to(taskExecutor::setKeepAliveSeconds);
|
||||
map.from(() -> this.allowCoreThreadTimeOut)
|
||||
.to(taskExecutor::setAllowCoreThreadTimeOut);
|
||||
map.from(() -> this.threadNamePrefix).whenHasText()
|
||||
.to(taskExecutor::setThreadNamePrefix);
|
||||
map.from(() -> this.taskDecorator).to(taskExecutor::setTaskDecorator);
|
||||
|
||||
if (!CollectionUtils.isEmpty(this.taskExecutorCustomizers)) {
|
||||
for (TaskExecutorCustomizer customizer : this.taskExecutorCustomizers) {
|
||||
customizer.customize(taskExecutor);
|
||||
}
|
||||
}
|
||||
return taskExecutor;
|
||||
}
|
||||
|
||||
private static <T> Set<T> append(Set<T> set, Collection<? extends T> additions) {
|
||||
Set<T> result = new LinkedHashSet<>((set != null) ? set : Collections.emptySet());
|
||||
result.addAll(additions);
|
||||
return Collections.unmodifiableSet(result);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,37 @@
|
||||
/*
|
||||
* Copyright 2012-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.boot.task;
|
||||
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
|
||||
/**
|
||||
* Callback interface that can be used to customize a {@link ThreadPoolTaskExecutor}.
|
||||
*
|
||||
* @author Stephane Nicoll
|
||||
* @since 2.1.0
|
||||
* @see TaskExecutorBuilder
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface TaskExecutorCustomizer {
|
||||
|
||||
/**
|
||||
* Callback to customize a {@link ThreadPoolTaskExecutor} instance.
|
||||
* @param taskExecutor the task executor to customize
|
||||
*/
|
||||
void customize(ThreadPoolTaskExecutor taskExecutor);
|
||||
|
||||
}
|
@ -0,0 +1,22 @@
|
||||
/*
|
||||
* Copyright 2012-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Task execution utilities.
|
||||
*
|
||||
* @author Stephane Nicoll
|
||||
*/
|
||||
package org.springframework.boot.task;
|
@ -0,0 +1,159 @@
|
||||
/*
|
||||
* Copyright 2012-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.boot.task;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import org.springframework.beans.DirectFieldAccessor;
|
||||
import org.springframework.core.task.TaskDecorator;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.test.util.ReflectionTestUtils;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyZeroInteractions;
|
||||
|
||||
/**
|
||||
* Tests for {@link TaskExecutorBuilder}.
|
||||
*
|
||||
* @author Stephane Nicoll
|
||||
*/
|
||||
public class TaskExecutorBuilderTests {
|
||||
|
||||
@Rule
|
||||
public ExpectedException thrown = ExpectedException.none();
|
||||
|
||||
private TaskExecutorBuilder builder = new TaskExecutorBuilder();
|
||||
|
||||
@Test
|
||||
public void createWhenCustomizersAreNullShouldThrowException() {
|
||||
this.thrown.expect(IllegalArgumentException.class);
|
||||
this.thrown.expectMessage("TaskExecutorCustomizers must not be null");
|
||||
new TaskExecutorBuilder((TaskExecutorCustomizer[]) null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void poolSettingsShouldApply() {
|
||||
ThreadPoolTaskExecutor executor = this.builder.allowCoreThreadTimeOut(true)
|
||||
.queueCapacity(10).corePoolSize(4).maxPoolSize(8)
|
||||
.allowCoreThreadTimeOut(true).keepAlive(Duration.ofMinutes(1)).build();
|
||||
DirectFieldAccessor dfa = new DirectFieldAccessor(executor);
|
||||
assertThat(dfa.getPropertyValue("queueCapacity")).isEqualTo(10);
|
||||
assertThat(executor.getCorePoolSize()).isEqualTo(4);
|
||||
assertThat(executor.getMaxPoolSize()).isEqualTo(8);
|
||||
assertThat(dfa.getPropertyValue("allowCoreThreadTimeOut")).isEqualTo(true);
|
||||
assertThat(executor.getKeepAliveSeconds()).isEqualTo(60);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void threadNamePrefixShouldApply() {
|
||||
ThreadPoolTaskExecutor executor = this.builder.threadNamePrefix("test-").build();
|
||||
assertThat(executor.getThreadNamePrefix()).isEqualTo("test-");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void taskDecoratorShouldApply() {
|
||||
TaskDecorator taskDecorator = mock(TaskDecorator.class);
|
||||
ThreadPoolTaskExecutor executor = this.builder.taskDecorator(taskDecorator)
|
||||
.build();
|
||||
assertThat(ReflectionTestUtils.getField(executor, "taskDecorator"))
|
||||
.isSameAs(taskDecorator);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void customizersWhenCustomizersAreNullShouldThrowException() {
|
||||
this.thrown.expect(IllegalArgumentException.class);
|
||||
this.thrown.expectMessage("TaskExecutorCustomizers must not be null");
|
||||
this.builder.customizers((TaskExecutorCustomizer[]) null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void customizersCollectionWhenCustomizersAreNullShouldThrowException() {
|
||||
this.thrown.expect(IllegalArgumentException.class);
|
||||
this.thrown.expectMessage("TaskExecutorCustomizers must not be null");
|
||||
this.builder.customizers((Set<TaskExecutorCustomizer>) null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void customizersShouldApply() {
|
||||
TaskExecutorCustomizer customizer = mock(TaskExecutorCustomizer.class);
|
||||
ThreadPoolTaskExecutor executor = this.builder.customizers(customizer).build();
|
||||
verify(customizer).customize(executor);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void customizersShouldBeAppliedLast() {
|
||||
TaskDecorator taskDecorator = mock(TaskDecorator.class);
|
||||
ThreadPoolTaskExecutor executor = spy(new ThreadPoolTaskExecutor());
|
||||
this.builder.allowCoreThreadTimeOut(true).queueCapacity(10).corePoolSize(4)
|
||||
.maxPoolSize(8).allowCoreThreadTimeOut(true)
|
||||
.keepAlive(Duration.ofMinutes(1)).threadNamePrefix("test-")
|
||||
.taskDecorator(taskDecorator).additionalCustomizers((taskExecutor) -> {
|
||||
verify(taskExecutor).setQueueCapacity(10);
|
||||
verify(taskExecutor).setCorePoolSize(4);
|
||||
verify(taskExecutor).setMaxPoolSize(8);
|
||||
verify(taskExecutor).setAllowCoreThreadTimeOut(true);
|
||||
verify(taskExecutor).setKeepAliveSeconds(60);
|
||||
verify(taskExecutor).setThreadNamePrefix("test-");
|
||||
verify(taskExecutor).setTaskDecorator(taskDecorator);
|
||||
});
|
||||
this.builder.configure(executor);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void customizersShouldReplaceExisting() {
|
||||
TaskExecutorCustomizer customizer1 = mock(TaskExecutorCustomizer.class);
|
||||
TaskExecutorCustomizer customizer2 = mock(TaskExecutorCustomizer.class);
|
||||
ThreadPoolTaskExecutor executor = this.builder.customizers(customizer1)
|
||||
.customizers(Collections.singleton(customizer2)).build();
|
||||
verifyZeroInteractions(customizer1);
|
||||
verify(customizer2).customize(executor);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void additionalCustomizersWhenCustomizersAreNullShouldThrowException() {
|
||||
this.thrown.expect(IllegalArgumentException.class);
|
||||
this.thrown.expectMessage("TaskExecutorCustomizers must not be null");
|
||||
this.builder.additionalCustomizers((TaskExecutorCustomizer[]) null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void additionalCustomizersCollectionWhenCustomizersAreNullShouldThrowException() {
|
||||
this.thrown.expect(IllegalArgumentException.class);
|
||||
this.thrown.expectMessage("TaskExecutorCustomizers must not be null");
|
||||
this.builder.additionalCustomizers((Set<TaskExecutorCustomizer>) null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void additionalCustomizersShouldAddToExisting() {
|
||||
TaskExecutorCustomizer customizer1 = mock(TaskExecutorCustomizer.class);
|
||||
TaskExecutorCustomizer customizer2 = mock(TaskExecutorCustomizer.class);
|
||||
ThreadPoolTaskExecutor executor = this.builder.customizers(customizer1)
|
||||
.additionalCustomizers(customizer2).build();
|
||||
verify(customizer1).customize(executor);
|
||||
verify(customizer2).customize(executor);
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue