Add support for @SendTo with kafka listener

This commit makes sure that the `replyTemplate` is set if a
KafkaTemplate is available in the context which effectively add support
for `@SendTo`.

Closes gh-10669
pull/10612/merge
Stephane Nicoll 7 years ago
parent 3412ee62d5
commit 852ad093b2

@ -19,6 +19,7 @@ package org.springframework.boot.autoconfigure.kafka;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.config.ContainerProperties; import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter;
@ -35,6 +36,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
private RecordMessageConverter messageConverter; private RecordMessageConverter messageConverter;
private KafkaTemplate<Object, Object> replyTemplate;
/** /**
* Set the {@link KafkaProperties} to use. * Set the {@link KafkaProperties} to use.
* @param properties the properties * @param properties the properties
@ -51,6 +54,14 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
this.messageConverter = messageConverter; this.messageConverter = messageConverter;
} }
/**
* Set the {@link KafkaTemplate} to use to send replies.
* @param replyTemplate the reply template
*/
void setReplyTemplate(KafkaTemplate<Object, Object> replyTemplate) {
this.replyTemplate = replyTemplate;
}
/** /**
* Configure the specified Kafka listener container factory. The factory can be * Configure the specified Kafka listener container factory. The factory can be
* further tuned and default settings can be overridden. * further tuned and default settings can be overridden.
@ -65,6 +76,9 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
if (this.messageConverter != null) { if (this.messageConverter != null) {
listenerContainerFactory.setMessageConverter(this.messageConverter); listenerContainerFactory.setMessageConverter(this.messageConverter);
} }
if (this.replyTemplate != null) {
listenerContainerFactory.setReplyTemplate(this.replyTemplate);
}
Listener container = this.properties.getListener(); Listener container = this.properties.getListener();
ContainerProperties containerProperties = listenerContainerFactory ContainerProperties containerProperties = listenerContainerFactory
.getContainerProperties(); .getContainerProperties();

@ -25,6 +25,7 @@ import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerConfigUtils; import org.springframework.kafka.config.KafkaListenerConfigUtils;
import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter;
/** /**
@ -42,10 +43,14 @@ class KafkaAnnotationDrivenConfiguration {
private final RecordMessageConverter messageConverter; private final RecordMessageConverter messageConverter;
private final KafkaTemplate<Object, Object> kafkaTemplate;
KafkaAnnotationDrivenConfiguration(KafkaProperties properties, KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
ObjectProvider<RecordMessageConverter> messageConverter) { ObjectProvider<RecordMessageConverter> messageConverter,
ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate) {
this.properties = properties; this.properties = properties;
this.messageConverter = messageConverter.getIfUnique(); this.messageConverter = messageConverter.getIfUnique();
this.kafkaTemplate = kafkaTemplate.getIfUnique();
} }
@Bean @Bean
@ -54,6 +59,7 @@ class KafkaAnnotationDrivenConfiguration {
ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer(); ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
configurer.setKafkaProperties(this.properties); configurer.setKafkaProperties(this.properties);
configurer.setMessageConverter(this.messageConverter); configurer.setMessageConverter(this.messageConverter);
configurer.setReplyTemplate(this.kafkaTemplate);
return configurer; return configurer;
} }

@ -326,6 +326,19 @@ public class KafkaAutoConfigurationTests {
}); });
} }
@Test
public void testConcurrentKafkaListenerContainerFactoryWithKafkaTemplate() {
this.contextRunner
.run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
DirectFieldAccessor dfa = new DirectFieldAccessor(
kafkaListenerContainerFactory);
assertThat(dfa.getPropertyValue("replyTemplate"))
.isSameAs(context.getBean(KafkaTemplate.class));
});
}
@Configuration @Configuration
protected static class TestConfiguration { protected static class TestConfiguration {

Loading…
Cancel
Save