From f68dfde35ee0bc3515e97a2195c6d843bff955aa Mon Sep 17 00:00:00 2001 From: anshlykov Date: Sun, 16 Aug 2020 14:39:11 +0300 Subject: [PATCH] Consider RecordFilterStrategy in Kafka auto-configuration See gh-22973 --- .../KafkaAnnotationDrivenConfiguration.java | 5 ++++- .../kafka/KafkaAutoConfigurationTests.java | 22 +++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java index 66c048db49..8f35a9e933 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java @@ -33,6 +33,7 @@ import org.springframework.kafka.listener.BatchErrorHandler; import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import org.springframework.kafka.listener.ErrorHandler; import org.springframework.kafka.listener.RecordInterceptor; +import org.springframework.kafka.listener.adapter.RecordFilterStrategy; import org.springframework.kafka.support.converter.BatchMessageConverter; import org.springframework.kafka.support.converter.BatchMessagingMessageConverter; import org.springframework.kafka.support.converter.MessageConverter; @@ -113,8 +114,10 @@ class KafkaAnnotationDrivenConfiguration { @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory") ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, - ObjectProvider> kafkaConsumerFactory) { + ObjectProvider> kafkaConsumerFactory, + ObjectProvider> kafkaFilterStrategyProvider) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + kafkaFilterStrategyProvider.ifAvailable(factory::setRecordFilterStrategy); configurer.configure(factory, kafkaConsumerFactory .getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties()))); return factory; diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 24132b1c4d..aa98086ada 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -62,6 +62,7 @@ import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.listener.RecordInterceptor; import org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler; import org.springframework.kafka.listener.SeekToCurrentErrorHandler; +import org.springframework.kafka.listener.adapter.RecordFilterStrategy; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.support.converter.BatchMessageConverter; import org.springframework.kafka.support.converter.BatchMessagingMessageConverter; @@ -588,6 +589,16 @@ class KafkaAutoConfigurationTests { }); } + @Test + void testConcurrentKafkaListenerContainerFactoryWithCustomRecordFilterStrategy() { + this.contextRunner.withUserConfiguration(TestRecordFilterStrategyConfiguration.class).run((context) -> { + ConcurrentKafkaListenerContainerFactory factory = context + .getBean(ConcurrentKafkaListenerContainerFactory.class); + assertThat(factory).hasFieldOrPropertyWithValue("recordFilterStrategy", + context.getBean("recordFilterStrategy")); + }); + } + @Configuration(proxyBeanMethods = false) static class MessageConverterConfiguration { @@ -720,4 +731,15 @@ class KafkaAutoConfigurationTests { } + @Configuration(proxyBeanMethods = false) + static class TestRecordFilterStrategyConfiguration { + + @Bean + @SuppressWarnings("unchecked") + RecordFilterStrategy recordFilterStrategy() { + return mock(RecordFilterStrategy.class); + } + + } + }