Enable virtual threads for Kafka listener

Closes gh-36396
pull/36682/head
Moritz Halbritter 1 year ago
parent 7c5ec73724
commit 3a9fadf30f

@ -21,6 +21,7 @@ import java.util.function.Function;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
@ -42,6 +43,7 @@ import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
* @author Gary Russell
* @author Eddú Meléndez
* @author Thomas Kåsene
* @author Moritz Halbritter
* @since 1.5.0
*/
public class ConcurrentKafkaListenerContainerFactoryConfigurer {
@ -70,6 +72,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
private Function<MessageListenerContainer, String> threadNameSupplier;
private SimpleAsyncTaskExecutor listenerTaskExecutor;
/**
* Set the {@link KafkaProperties} to use.
* @param properties the properties
@ -168,6 +172,14 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
this.threadNameSupplier = threadNameSupplier;
}
/**
* Set the executor for threads that poll the consumer.
* @param listenerTaskExecutor task executor
*/
void setListenerTaskExecutor(SimpleAsyncTaskExecutor listenerTaskExecutor) {
this.listenerTaskExecutor = listenerTaskExecutor;
}
/**
* Configure the specified Kafka listener container factory. The factory can be
* further tuned and default settings can be overridden.
@ -226,6 +238,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
map.from(properties::isImmediateStop).to(container::setStopImmediate);
map.from(this.transactionManager).to(container::setTransactionManager);
map.from(this.rebalanceListener).to(container::setConsumerRebalanceListener);
map.from(this.listenerTaskExecutor).to(container::setListenerTaskExecutor);
}
}

@ -21,8 +21,11 @@ import java.util.function.Function;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnThreading;
import org.springframework.boot.autoconfigure.thread.Threading;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.ContainerCustomizer;
@ -49,6 +52,7 @@ import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
* @author Gary Russell
* @author Eddú Meléndez
* @author Thomas Kåsene
* @author Moritz Halbritter
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(EnableKafka.class)
@ -107,7 +111,23 @@ class KafkaAnnotationDrivenConfiguration {
@Bean
@ConditionalOnMissingBean
@ConditionalOnThreading(Threading.PLATFORM)
ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
return configurer();
}
@Bean(name = "kafkaListenerContainerFactoryConfigurer")
@ConditionalOnMissingBean
@ConditionalOnThreading(Threading.VIRTUAL)
ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurerVirtualThreads() {
ConcurrentKafkaListenerContainerFactoryConfigurer configurer = configurer();
SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor("kafka-");
executor.setVirtualThreads(true);
configurer.setListenerTaskExecutor(executor);
return configurer;
}
private ConcurrentKafkaListenerContainerFactoryConfigurer configurer() {
ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
configurer.setKafkaProperties(this.properties);
configurer.setBatchMessageConverter(this.batchMessageConverter);

@ -21,10 +21,12 @@ import java.util.function.Function;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.MessageListenerContainer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@ -70,4 +72,12 @@ class ConcurrentKafkaListenerContainerFactoryConfigurerTests {
then(this.factory).should().setChangeConsumerThreadName(true);
}
@Test
void shouldApplyListenerTaskExecutor() {
SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
this.configurer.setListenerTaskExecutor(executor);
this.configurer.configure(this.factory, this.consumerFactory);
assertThat(this.factory.getContainerProperties().getListenerTaskExecutor()).isEqualTo(executor);
}
}

@ -40,6 +40,8 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledForJreRange;
import org.junit.jupiter.api.condition.JRE;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@ -47,8 +49,11 @@ import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.assertj.AssertableApplicationContext;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.boot.test.context.runner.ContextConsumer;
import org.springframework.boot.testsupport.assertj.SimpleAsyncTaskExecutorAssert;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.AbstractKafkaListenerContainerFactory;
@ -570,6 +575,31 @@ class KafkaAutoConfigurationTests {
});
}
@Test
void shouldUsePlatformThreadsByDefault() {
this.contextRunner.run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(factory).isNotNull();
AsyncTaskExecutor listenerTaskExecutor = factory.getContainerProperties().getListenerTaskExecutor();
assertThat(listenerTaskExecutor).isNull();
});
}
@Test
@EnabledForJreRange(min = JRE.JAVA_21)
void shouldUseVirtualThreadsIfEnabled() {
this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true").run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(factory).isNotNull();
AsyncTaskExecutor listenerTaskExecutor = factory.getContainerProperties().getListenerTaskExecutor();
assertThat(listenerTaskExecutor).isInstanceOf(SimpleAsyncTaskExecutor.class);
SimpleAsyncTaskExecutorAssert.assertThat((SimpleAsyncTaskExecutor) listenerTaskExecutor)
.usesVirtualThreads();
});
}
@SuppressWarnings("unchecked")
@Test
void listenerProperties() {

Loading…
Cancel
Save