Handle BatchMessagingMessageConverter with Kafka batch listeners

Closes gh-15942
pull/13916/head
Stephane Nicoll 6 years ago
parent b183c0b709
commit 3d6def9f81

@ -1,5 +1,5 @@
/*
* Copyright 2012-2018 the original author or authors.
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -26,7 +26,7 @@ import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
/**
@ -40,7 +40,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
private KafkaProperties properties;
private RecordMessageConverter messageConverter;
private MessageConverter messageConverter;
private KafkaTemplate<Object, Object> replyTemplate;
@ -59,10 +59,10 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
}
/**
* Set the {@link RecordMessageConverter} to use.
* Set the {@link MessageConverter} to use.
* @param messageConverter the message converter
*/
void setMessageConverter(RecordMessageConverter messageConverter) {
void setMessageConverter(MessageConverter messageConverter) {
this.messageConverter = messageConverter;
}

@ -1,5 +1,5 @@
/*
* Copyright 2012-2018 the original author or authors.
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,6 +19,7 @@ package org.springframework.boot.autoconfigure.kafka;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener.Type;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
@ -28,6 +29,9 @@ import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
@ -46,6 +50,8 @@ class KafkaAnnotationDrivenConfiguration {
private final RecordMessageConverter messageConverter;
private final BatchMessageConverter batchMessageConverter;
private final KafkaTemplate<Object, Object> kafkaTemplate;
private final KafkaAwareTransactionManager<Object, Object> transactionManager;
@ -56,12 +62,15 @@ class KafkaAnnotationDrivenConfiguration {
KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
ObjectProvider<RecordMessageConverter> messageConverter,
ObjectProvider<BatchMessageConverter> batchMessageConverter,
ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,
ObjectProvider<ErrorHandler> errorHandler,
ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor) {
this.properties = properties;
this.messageConverter = messageConverter.getIfUnique();
this.batchMessageConverter = batchMessageConverter.getIfUnique(
() -> new BatchMessagingMessageConverter(this.messageConverter));
this.kafkaTemplate = kafkaTemplate.getIfUnique();
this.transactionManager = kafkaTransactionManager.getIfUnique();
this.errorHandler = errorHandler.getIfUnique();
@ -73,7 +82,9 @@ class KafkaAnnotationDrivenConfiguration {
public ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
configurer.setKafkaProperties(this.properties);
configurer.setMessageConverter(this.messageConverter);
MessageConverter messageConverterToUse = (this.properties.getListener().getType()
.equals(Type.BATCH)) ? this.batchMessageConverter : this.messageConverter;
configurer.setMessageConverter(messageConverterToUse);
configurer.setReplyTemplate(this.kafkaTemplate);
configurer.setTransactionManager(this.transactionManager);
configurer.setErrorHandler(this.errorHandler);

@ -1,5 +1,5 @@
/*
* Copyright 2012-2018 the original author or authors.
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -56,6 +56,8 @@ import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.test.utils.KafkaTestUtils;
@ -503,7 +505,7 @@ public class KafkaAutoConfigurationTests {
}
@Test
public void testConcurrentKafkaListenerContainerFactoryWithCustomMessageConverters() {
public void testConcurrentKafkaListenerContainerFactoryWithCustomMessageConverter() {
this.contextRunner.withUserConfiguration(MessageConverterConfiguration.class)
.run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory = context
@ -513,6 +515,51 @@ public class KafkaAutoConfigurationTests {
});
}
@Test
public void testConcurrentKafkaListenerContainerFactoryInBatchModeWithCustomMessageConverter() {
this.contextRunner
.withUserConfiguration(BatchMessageConverterConfiguration.class,
MessageConverterConfiguration.class)
.withPropertyValues("spring.kafka.listener.type=batch").run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(kafkaListenerContainerFactory).hasFieldOrPropertyWithValue(
"messageConverter",
context.getBean("myBatchMessageConverter"));
});
}
@Test
public void testConcurrentKafkaListenerContainerFactoryInBatchModeWrapsCustomMessageConverter() {
this.contextRunner.withUserConfiguration(MessageConverterConfiguration.class)
.withPropertyValues("spring.kafka.listener.type=batch").run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
Object messageConverter = ReflectionTestUtils
.getField(kafkaListenerContainerFactory, "messageConverter");
assertThat(messageConverter)
.isInstanceOf(BatchMessagingMessageConverter.class);
assertThat(((BatchMessageConverter) messageConverter)
.getRecordMessageConverter())
.isSameAs(context.getBean("myMessageConverter"));
});
}
@Test
public void testConcurrentKafkaListenerContainerFactoryInBatchModeWithNoMessageConverter() {
this.contextRunner.withPropertyValues("spring.kafka.listener.type=batch")
.run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
Object messageConverter = ReflectionTestUtils
.getField(kafkaListenerContainerFactory, "messageConverter");
assertThat(messageConverter)
.isInstanceOf(BatchMessagingMessageConverter.class);
assertThat(((BatchMessageConverter) messageConverter)
.getRecordMessageConverter()).isNull();
});
}
@Test
public void testConcurrentKafkaListenerContainerFactoryWithCustomErrorHandler() {
this.contextRunner.withUserConfiguration(ErrorHandlerConfiguration.class)
@ -583,6 +630,16 @@ public class KafkaAutoConfigurationTests {
}
@Configuration
protected static class BatchMessageConverterConfiguration {
@Bean
public BatchMessageConverter myBatchMessageConverter() {
return mock(BatchMessageConverter.class);
}
}
@Configuration
protected static class ErrorHandlerConfiguration {

@ -5880,9 +5880,12 @@ The following component creates a listener endpoint on the `someTopic` topic:
----
If a `KafkaTransactionManager` bean is defined, it is automatically associated to the
container factory. Similarly, if a `RecordMessageConverter`, `ErrorHandler` or
`AfterRollbackProcessor` bean is defined, it is automatically associated to the default
factory.
container factory. Similarly, if a `ErrorHandler` or `AfterRollbackProcessor` bean is
defined, it is automatically associated to the default factory.
Depending on the listener type, a `RecordMessageConverter` or `BatchMessageConverter` bean
is associated to the default factory. If only a `RecordMessageConverter` bean is present
for a batch listener, it is wrapped in a `BatchMessageConverter`.
TIP: A custom `ChainedKafkaTransactionManager` must be marked `@Primary` as it usually
references the auto-configured `KafkaTransactionManager` bean.

Loading…
Cancel
Save