From d1a089ba714f4f22df9dd6cbea85c3646925b72d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Thomas=20K=C3=A5sene?= <6691406+ThomasKasene@users.noreply.github.com> Date: Tue, 1 Nov 2022 19:20:57 +0100 Subject: [PATCH] Auto-configure BatchInterceptor on ConcurrentKafkaListenerContainerFactory See gh-32951 --- ...fkaListenerContainerFactoryConfigurer.java | 13 ++++++++++++ .../KafkaAnnotationDrivenConfiguration.java | 9 +++++++- .../kafka/KafkaAutoConfigurationTests.java | 21 +++++++++++++++++++ 3 files changed, 42 insertions(+), 1 deletion(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index c0ae97f999..55a1b2d799 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -24,6 +24,7 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.AfterRollbackProcessor; +import org.springframework.kafka.listener.BatchInterceptor; import org.springframework.kafka.listener.CommonErrorHandler; import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import org.springframework.kafka.listener.ContainerProperties; @@ -37,6 +38,7 @@ import org.springframework.kafka.transaction.KafkaAwareTransactionManager; * * @author Gary Russell * @author Eddú Meléndez + * @author Thomas Kåsene * @since 1.5.0 */ public class ConcurrentKafkaListenerContainerFactoryConfigurer { @@ -59,6 +61,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { private RecordInterceptor recordInterceptor; + private BatchInterceptor batchInterceptor; + /** * Set the {@link KafkaProperties} to use. * @param properties the properties @@ -133,6 +137,14 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { this.recordInterceptor = recordInterceptor; } + /** + * Set the {@link BatchInterceptor} to use. + * @param batchInterceptor the batch interceptor. + */ + void setBatchInterceptor(BatchInterceptor batchInterceptor) { + this.batchInterceptor = batchInterceptor; + } + /** * Configure the specified Kafka listener container factory. The factory can be * further tuned and default settings can be overridden. @@ -161,6 +173,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { map.from(this.commonErrorHandler).to(factory::setCommonErrorHandler); map.from(this.afterRollbackProcessor).to(factory::setAfterRollbackProcessor); map.from(this.recordInterceptor).to(factory::setRecordInterceptor); + map.from(this.batchInterceptor).to(factory::setBatchInterceptor); } private void configureContainer(ContainerProperties container) { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java index 5a6fb0e523..996b2199d8 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java @@ -29,6 +29,7 @@ import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.AfterRollbackProcessor; +import org.springframework.kafka.listener.BatchInterceptor; import org.springframework.kafka.listener.CommonErrorHandler; import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import org.springframework.kafka.listener.RecordInterceptor; @@ -44,6 +45,7 @@ import org.springframework.kafka.transaction.KafkaAwareTransactionManager; * * @author Gary Russell * @author Eddú Meléndez + * @author Thomas Kåsene */ @Configuration(proxyBeanMethods = false) @ConditionalOnClass(EnableKafka.class) @@ -69,6 +71,8 @@ class KafkaAnnotationDrivenConfiguration { private final RecordInterceptor recordInterceptor; + private final BatchInterceptor batchInterceptor; + KafkaAnnotationDrivenConfiguration(KafkaProperties properties, ObjectProvider messageConverter, ObjectProvider> recordFilterStrategy, @@ -78,7 +82,8 @@ class KafkaAnnotationDrivenConfiguration { ObjectProvider rebalanceListener, ObjectProvider commonErrorHandler, ObjectProvider> afterRollbackProcessor, - ObjectProvider> recordInterceptor) { + ObjectProvider> recordInterceptor, + ObjectProvider> batchInterceptor) { this.properties = properties; this.messageConverter = messageConverter.getIfUnique(); this.recordFilterStrategy = recordFilterStrategy.getIfUnique(); @@ -90,6 +95,7 @@ class KafkaAnnotationDrivenConfiguration { this.commonErrorHandler = commonErrorHandler.getIfUnique(); this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique(); this.recordInterceptor = recordInterceptor.getIfUnique(); + this.batchInterceptor = batchInterceptor.getIfUnique(); } @Bean @@ -107,6 +113,7 @@ class KafkaAnnotationDrivenConfiguration { configurer.setCommonErrorHandler(this.commonErrorHandler); configurer.setAfterRollbackProcessor(this.afterRollbackProcessor); configurer.setRecordInterceptor(this.recordInterceptor); + configurer.setBatchInterceptor(this.batchInterceptor); return configurer; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index fee655bcd6..ec53a2d850 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -63,6 +63,7 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.AfterRollbackProcessor; +import org.springframework.kafka.listener.BatchInterceptor; import org.springframework.kafka.listener.CommonErrorHandler; import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import org.springframework.kafka.listener.ContainerProperties; @@ -95,6 +96,7 @@ import static org.mockito.Mockito.never; * @author Eddú Meléndez * @author Nakul Mishra * @author Tomaz Fernandes + * @author Thomas Kåsene */ class KafkaAutoConfigurationTests { @@ -645,6 +647,15 @@ class KafkaAutoConfigurationTests { }); } + @Test + void testConcurrentKafkaListenerContainerFactoryWithCustomBatchInterceptor() { + this.contextRunner.withUserConfiguration(BatchInterceptorConfiguration.class).run((context) -> { + ConcurrentKafkaListenerContainerFactory factory = context + .getBean(ConcurrentKafkaListenerContainerFactory.class); + assertThat(factory).hasFieldOrPropertyWithValue("batchInterceptor", context.getBean("batchInterceptor")); + }); + } + @Test void testConcurrentKafkaListenerContainerFactoryWithCustomRebalanceListener() { this.contextRunner.withUserConfiguration(RebalanceListenerConfiguration.class).run((context) -> { @@ -764,6 +775,16 @@ class KafkaAutoConfigurationTests { } + @Configuration(proxyBeanMethods = false) + static class BatchInterceptorConfiguration { + + @Bean + BatchInterceptor batchInterceptor() { + return (batch, consumer) -> batch; + } + + } + @Configuration(proxyBeanMethods = false) static class RebalanceListenerConfiguration {