diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index a0237aaae0..abd81f2114 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -19,6 +19,7 @@ package org.springframework.boot.autoconfigure.kafka; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.config.ContainerProperties; import org.springframework.kafka.support.converter.RecordMessageConverter; @@ -35,6 +36,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { private RecordMessageConverter messageConverter; + private KafkaTemplate replyTemplate; + /** * Set the {@link KafkaProperties} to use. * @param properties the properties @@ -51,6 +54,14 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { this.messageConverter = messageConverter; } + /** + * Set the {@link KafkaTemplate} to use to send replies. + * @param replyTemplate the reply template + */ + void setReplyTemplate(KafkaTemplate replyTemplate) { + this.replyTemplate = replyTemplate; + } + /** * Configure the specified Kafka listener container factory. The factory can be * further tuned and default settings can be overridden. @@ -65,6 +76,9 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { if (this.messageConverter != null) { listenerContainerFactory.setMessageConverter(this.messageConverter); } + if (this.replyTemplate != null) { + listenerContainerFactory.setReplyTemplate(this.replyTemplate); + } Listener container = this.properties.getListener(); ContainerProperties containerProperties = listenerContainerFactory .getContainerProperties(); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java index 96040a592b..d52f8a5fa7 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java @@ -25,6 +25,7 @@ import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerConfigUtils; import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.converter.RecordMessageConverter; /** @@ -42,10 +43,14 @@ class KafkaAnnotationDrivenConfiguration { private final RecordMessageConverter messageConverter; + private final KafkaTemplate kafkaTemplate; + KafkaAnnotationDrivenConfiguration(KafkaProperties properties, - ObjectProvider messageConverter) { + ObjectProvider messageConverter, + ObjectProvider> kafkaTemplate) { this.properties = properties; this.messageConverter = messageConverter.getIfUnique(); + this.kafkaTemplate = kafkaTemplate.getIfUnique(); } @Bean @@ -54,6 +59,7 @@ class KafkaAnnotationDrivenConfiguration { ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer(); configurer.setKafkaProperties(this.properties); configurer.setMessageConverter(this.messageConverter); + configurer.setReplyTemplate(this.kafkaTemplate); return configurer; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index da34aacec2..a58d02ce17 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -326,6 +326,19 @@ public class KafkaAutoConfigurationTests { }); } + @Test + public void testConcurrentKafkaListenerContainerFactoryWithKafkaTemplate() { + this.contextRunner + .run((context) -> { + ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory = context + .getBean(ConcurrentKafkaListenerContainerFactory.class); + DirectFieldAccessor dfa = new DirectFieldAccessor( + kafkaListenerContainerFactory); + assertThat(dfa.getPropertyValue("replyTemplate")) + .isSameAs(context.getBean(KafkaTemplate.class)); + }); + } + @Configuration protected static class TestConfiguration {