Configure virtual threads on the RabbitMQ listener

Closes gh-36387
pull/36614/head
Moritz Halbritter 1 year ago
parent f85a7258a6
commit b1a3dad16c

@ -17,6 +17,7 @@
package org.springframework.boot.autoconfigure.amqp;
import java.util.List;
import java.util.concurrent.Executor;
import org.springframework.amqp.rabbit.config.AbstractRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
@ -47,6 +48,8 @@ public abstract class AbstractRabbitListenerContainerFactoryConfigurer<T extends
private final RabbitProperties rabbitProperties;
private Executor taskExecutor;
/**
* Creates a new configurer that will use the given {@code rabbitProperties}.
* @param rabbitProperties properties to use
@ -81,6 +84,14 @@ public abstract class AbstractRabbitListenerContainerFactoryConfigurer<T extends
this.retryTemplateCustomizers = retryTemplateCustomizers;
}
/**
* Set the task executor to use.
* @param taskExecutor the task executor
*/
public void setTaskExecutor(Executor taskExecutor) {
this.taskExecutor = taskExecutor;
}
protected final RabbitProperties getRabbitProperties() {
return this.rabbitProperties;
}
@ -119,6 +130,9 @@ public abstract class AbstractRabbitListenerContainerFactoryConfigurer<T extends
factory.setMissingQueuesFatal(configuration.isMissingQueuesFatal());
factory.setDeBatchingEnabled(configuration.isDeBatchingEnabled());
factory.setForceStop(configuration.isForceStop());
if (this.taskExecutor != null) {
factory.setTaskExecutor(this.taskExecutor);
}
ListenerRetry retryConfig = configuration.getRetry();
if (retryConfig.isEnabled()) {
RetryInterceptorBuilder<?, ?> builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless()

@ -30,6 +30,7 @@ import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.thread.VirtualThreads;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -51,13 +52,17 @@ class RabbitAnnotationDrivenConfiguration {
private final RabbitProperties properties;
private final ObjectProvider<VirtualThreads> virtualThreads;
RabbitAnnotationDrivenConfiguration(ObjectProvider<MessageConverter> messageConverter,
ObjectProvider<MessageRecoverer> messageRecoverer,
ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers, RabbitProperties properties) {
ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers, RabbitProperties properties,
ObjectProvider<VirtualThreads> virtualThreads) {
this.messageConverter = messageConverter;
this.messageRecoverer = messageRecoverer;
this.retryTemplateCustomizers = retryTemplateCustomizers;
this.properties = properties;
this.virtualThreads = virtualThreads;
}
@Bean
@ -68,6 +73,7 @@ class RabbitAnnotationDrivenConfiguration {
configurer.setMessageConverter(this.messageConverter.getIfUnique());
configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique());
configurer.setRetryTemplateCustomizers(this.retryTemplateCustomizers.orderedStream().toList());
this.virtualThreads.ifAvailable((virtualThreads) -> configurer.setTaskExecutor(virtualThreads.getExecutor()));
return configurer;
}
@ -92,6 +98,7 @@ class RabbitAnnotationDrivenConfiguration {
configurer.setMessageConverter(this.messageConverter.getIfUnique());
configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique());
configurer.setRetryTemplateCustomizers(this.retryTemplateCustomizers.orderedStream().toList());
this.virtualThreads.ifAvailable((virtualThreads) -> configurer.setTaskExecutor(virtualThreads.getExecutor()));
return configurer;
}

@ -30,6 +30,8 @@ import com.rabbitmq.client.impl.CredentialsRefreshService;
import com.rabbitmq.client.impl.DefaultCredentialsProvider;
import org.aopalliance.aop.Advice;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledForJreRange;
import org.junit.jupiter.api.condition.JRE;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InOrder;
@ -58,6 +60,7 @@ import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.autoconfigure.thread.VirtualThreadsAutoConfiguration;
import org.springframework.boot.test.context.assertj.AssertableApplicationContext;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.boot.test.system.CapturedOutput;
@ -532,6 +535,19 @@ class RabbitAutoConfigurationTests {
});
}
@Test
@EnabledForJreRange(min = JRE.JAVA_21)
void shouldConfigureVirtualThreads() {
this.contextRunner.withConfiguration(AutoConfigurations.of(VirtualThreadsAutoConfiguration.class))
.withPropertyValues("spring.threads.virtual.enabled=true")
.run((context) -> {
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = context
.getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class);
Object executor = ReflectionTestUtils.getField(rabbitListenerContainerFactory, "taskExecutor");
assertThat(executor).as("rabbitListenerContainerFactory.taskExecutor").isNotNull();
});
}
@Test
void testSimpleRabbitListenerContainerFactoryWithDefaultForceStop() {
this.contextRunner

Loading…
Cancel
Save