From b1a3dad16cead58676b1eeac25cdb0f6d005d5ae Mon Sep 17 00:00:00 2001 From: Moritz Halbritter Date: Thu, 27 Jul 2023 16:32:30 +0200 Subject: [PATCH] Configure virtual threads on the RabbitMQ listener Closes gh-36387 --- ...RabbitListenerContainerFactoryConfigurer.java | 14 ++++++++++++++ .../RabbitAnnotationDrivenConfiguration.java | 9 ++++++++- .../amqp/RabbitAutoConfigurationTests.java | 16 ++++++++++++++++ 3 files changed, 38 insertions(+), 1 deletion(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java index 154ac99cea..781029791b 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java @@ -17,6 +17,7 @@ package org.springframework.boot.autoconfigure.amqp; import java.util.List; +import java.util.concurrent.Executor; import org.springframework.amqp.rabbit.config.AbstractRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder; @@ -47,6 +48,8 @@ public abstract class AbstractRabbitListenerContainerFactoryConfigurer builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless() diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java index 51decebff5..2a81759aea 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java @@ -30,6 +30,7 @@ import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.autoconfigure.thread.VirtualThreads; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -51,13 +52,17 @@ class RabbitAnnotationDrivenConfiguration { private final RabbitProperties properties; + private final ObjectProvider virtualThreads; + RabbitAnnotationDrivenConfiguration(ObjectProvider messageConverter, ObjectProvider messageRecoverer, - ObjectProvider retryTemplateCustomizers, RabbitProperties properties) { + ObjectProvider retryTemplateCustomizers, RabbitProperties properties, + ObjectProvider virtualThreads) { this.messageConverter = messageConverter; this.messageRecoverer = messageRecoverer; this.retryTemplateCustomizers = retryTemplateCustomizers; this.properties = properties; + this.virtualThreads = virtualThreads; } @Bean @@ -68,6 +73,7 @@ class RabbitAnnotationDrivenConfiguration { configurer.setMessageConverter(this.messageConverter.getIfUnique()); configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique()); configurer.setRetryTemplateCustomizers(this.retryTemplateCustomizers.orderedStream().toList()); + this.virtualThreads.ifAvailable((virtualThreads) -> configurer.setTaskExecutor(virtualThreads.getExecutor())); return configurer; } @@ -92,6 +98,7 @@ class RabbitAnnotationDrivenConfiguration { configurer.setMessageConverter(this.messageConverter.getIfUnique()); configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique()); configurer.setRetryTemplateCustomizers(this.retryTemplateCustomizers.orderedStream().toList()); + this.virtualThreads.ifAvailable((virtualThreads) -> configurer.setTaskExecutor(virtualThreads.getExecutor())); return configurer; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java index 363a4153b8..afe3271b48 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java @@ -30,6 +30,8 @@ import com.rabbitmq.client.impl.CredentialsRefreshService; import com.rabbitmq.client.impl.DefaultCredentialsProvider; import org.aopalliance.aop.Advice; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledForJreRange; +import org.junit.jupiter.api.condition.JRE; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InOrder; @@ -58,6 +60,7 @@ import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.autoconfigure.thread.VirtualThreadsAutoConfiguration; import org.springframework.boot.test.context.assertj.AssertableApplicationContext; import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.boot.test.system.CapturedOutput; @@ -532,6 +535,19 @@ class RabbitAutoConfigurationTests { }); } + @Test + @EnabledForJreRange(min = JRE.JAVA_21) + void shouldConfigureVirtualThreads() { + this.contextRunner.withConfiguration(AutoConfigurations.of(VirtualThreadsAutoConfiguration.class)) + .withPropertyValues("spring.threads.virtual.enabled=true") + .run((context) -> { + SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = context + .getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class); + Object executor = ReflectionTestUtils.getField(rabbitListenerContainerFactory, "taskExecutor"); + assertThat(executor).as("rabbitListenerContainerFactory.taskExecutor").isNotNull(); + }); + } + @Test void testSimpleRabbitListenerContainerFactoryWithDefaultForceStop() { this.contextRunner