diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index 28ca5b4914..e6d0ac47d0 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2019 the original author or authors. + * Copyright 2012-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,7 @@ import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ErrorHandler; import org.springframework.kafka.listener.RecordInterceptor; +import org.springframework.kafka.listener.adapter.RecordFilterStrategy; import org.springframework.kafka.support.converter.MessageConverter; import org.springframework.kafka.transaction.KafkaAwareTransactionManager; @@ -45,6 +46,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { private MessageConverter messageConverter; + private RecordFilterStrategy recordFilterStrategy; + private KafkaTemplate replyTemplate; private KafkaAwareTransactionManager transactionManager; @@ -75,6 +78,14 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { this.messageConverter = messageConverter; } + /** + * Set the {@link RecordFilterStrategy} to use to filter incoming records. + * @param recordFilterStrategy the record filter strategy + */ + void setRecordFilterStrategy(RecordFilterStrategy recordFilterStrategy) { + this.recordFilterStrategy = recordFilterStrategy; + } + /** * Set the {@link KafkaTemplate} to use to send replies. * @param replyTemplate the reply template @@ -151,6 +162,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { Listener properties = this.properties.getListener(); map.from(properties::getConcurrency).to(factory::setConcurrency); map.from(this.messageConverter).to(factory::setMessageConverter); + map.from(this.recordFilterStrategy).to(factory::setRecordFilterStrategy); map.from(this.replyTemplate).to(factory::setReplyTemplate); if (properties.getType().equals(Listener.Type.BATCH)) { factory.setBatchListener(true); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java index 66c048db49..e11a6b1386 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java @@ -33,6 +33,7 @@ import org.springframework.kafka.listener.BatchErrorHandler; import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import org.springframework.kafka.listener.ErrorHandler; import org.springframework.kafka.listener.RecordInterceptor; +import org.springframework.kafka.listener.adapter.RecordFilterStrategy; import org.springframework.kafka.support.converter.BatchMessageConverter; import org.springframework.kafka.support.converter.BatchMessagingMessageConverter; import org.springframework.kafka.support.converter.MessageConverter; @@ -53,6 +54,8 @@ class KafkaAnnotationDrivenConfiguration { private final RecordMessageConverter messageConverter; + private final RecordFilterStrategy recordFilterStrategy; + private final BatchMessageConverter batchMessageConverter; private final KafkaTemplate kafkaTemplate; @@ -71,6 +74,7 @@ class KafkaAnnotationDrivenConfiguration { KafkaAnnotationDrivenConfiguration(KafkaProperties properties, ObjectProvider messageConverter, + ObjectProvider> recordFilterStrategy, ObjectProvider batchMessageConverter, ObjectProvider> kafkaTemplate, ObjectProvider> kafkaTransactionManager, @@ -80,6 +84,7 @@ class KafkaAnnotationDrivenConfiguration { ObjectProvider> recordInterceptor) { this.properties = properties; this.messageConverter = messageConverter.getIfUnique(); + this.recordFilterStrategy = recordFilterStrategy.getIfUnique(); this.batchMessageConverter = batchMessageConverter .getIfUnique(() -> new BatchMessagingMessageConverter(this.messageConverter)); this.kafkaTemplate = kafkaTemplate.getIfUnique(); @@ -99,6 +104,7 @@ class KafkaAnnotationDrivenConfiguration { MessageConverter messageConverterToUse = (this.properties.getListener().getType().equals(Type.BATCH)) ? this.batchMessageConverter : this.messageConverter; configurer.setMessageConverter(messageConverterToUse); + configurer.setRecordFilterStrategy(this.recordFilterStrategy); configurer.setReplyTemplate(this.kafkaTemplate); configurer.setTransactionManager(this.transactionManager); configurer.setRebalanceListener(this.rebalanceListener); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 24132b1c4d..176a9d7982 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -62,6 +62,7 @@ import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.listener.RecordInterceptor; import org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler; import org.springframework.kafka.listener.SeekToCurrentErrorHandler; +import org.springframework.kafka.listener.adapter.RecordFilterStrategy; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.support.converter.BatchMessageConverter; import org.springframework.kafka.support.converter.BatchMessagingMessageConverter; @@ -464,6 +465,25 @@ class KafkaAutoConfigurationTests { }); } + @Test + void testConcurrentKafkaListenerContainerFactoryWithDefaultRecordFilterStrategy() { + this.contextRunner.run((context) -> { + ConcurrentKafkaListenerContainerFactory factory = context + .getBean(ConcurrentKafkaListenerContainerFactory.class); + assertThat(factory).hasFieldOrPropertyWithValue("recordFilterStrategy", null); + }); + } + + @Test + void testConcurrentKafkaListenerContainerFactoryWithCustomRecordFilterStrategy() { + this.contextRunner.withUserConfiguration(RecordFilterStrategyConfiguration.class).run((context) -> { + ConcurrentKafkaListenerContainerFactory factory = context + .getBean(ConcurrentKafkaListenerContainerFactory.class); + assertThat(factory).hasFieldOrPropertyWithValue("recordFilterStrategy", + context.getBean("recordFilterStrategy")); + }); + } + @Test void testConcurrentKafkaListenerContainerFactoryWithCustomErrorHandler() { this.contextRunner.withUserConfiguration(ErrorHandlerConfiguration.class).run((context) -> { @@ -608,6 +628,16 @@ class KafkaAutoConfigurationTests { } + @Configuration(proxyBeanMethods = false) + static class RecordFilterStrategyConfiguration { + + @Bean + RecordFilterStrategy recordFilterStrategy() { + return (record) -> false; + } + + } + @Configuration(proxyBeanMethods = false) static class ErrorHandlerConfiguration { diff --git a/spring-boot-project/spring-boot-docs/src/docs/asciidoc/spring-boot-features.adoc b/spring-boot-project/spring-boot-docs/src/docs/asciidoc/spring-boot-features.adoc index 3d4659b0dc..39b06def51 100644 --- a/spring-boot-project/spring-boot-docs/src/docs/asciidoc/spring-boot-features.adoc +++ b/spring-boot-project/spring-boot-docs/src/docs/asciidoc/spring-boot-features.adoc @@ -5725,7 +5725,7 @@ The following component creates a listener endpoint on the `someTopic` topic: ---- If a `KafkaTransactionManager` bean is defined, it is automatically associated to the container factory. -Similarly, if a `ErrorHandler`, `AfterRollbackProcessor` or `ConsumerAwareRebalanceListener` bean is defined, it is automatically associated to the default factory. +Similarly, if a `RecordFilterStrategy`, `ErrorHandler`, `AfterRollbackProcessor` or `ConsumerAwareRebalanceListener` bean is defined, it is automatically associated to the default factory. Depending on the listener type, a `RecordMessageConverter` or `BatchMessageConverter` bean is associated to the default factory. If only a `RecordMessageConverter` bean is present for a batch listener, it is wrapped in a `BatchMessageConverter`.