Consider RecordFilterStrategy in Kafka auto-configuration

See gh-22973
pull/22412/head
anshlykov 4 years ago committed by Stephane Nicoll
parent 943a6d2a2b
commit f68dfde35e

@ -33,6 +33,7 @@ import org.springframework.kafka.listener.BatchErrorHandler;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
import org.springframework.kafka.support.converter.MessageConverter;
@ -113,8 +114,10 @@ class KafkaAnnotationDrivenConfiguration {
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory,
ObjectProvider<RecordFilterStrategy<Object, Object>> kafkaFilterStrategyProvider) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
kafkaFilterStrategyProvider.ifAvailable(factory::setRecordFilterStrategy);
configurer.configure(factory, kafkaConsumerFactory
.getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));
return factory;

@ -62,6 +62,7 @@ import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
@ -588,6 +589,16 @@ class KafkaAutoConfigurationTests {
});
}
@Test
void testConcurrentKafkaListenerContainerFactoryWithCustomRecordFilterStrategy() {
this.contextRunner.withUserConfiguration(TestRecordFilterStrategyConfiguration.class).run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(factory).hasFieldOrPropertyWithValue("recordFilterStrategy",
context.getBean("recordFilterStrategy"));
});
}
@Configuration(proxyBeanMethods = false)
static class MessageConverterConfiguration {
@ -720,4 +731,15 @@ class KafkaAutoConfigurationTests {
}
@Configuration(proxyBeanMethods = false)
static class TestRecordFilterStrategyConfiguration {
@Bean
@SuppressWarnings("unchecked")
RecordFilterStrategy<Object, Object> recordFilterStrategy() {
return mock(RecordFilterStrategy.class);
}
}
}

Loading…
Cancel
Save